Home | History | Annotate | Download | only in test
      1 """Unit tests for the io module."""
      2 
      3 # Tests of io are scattered over the test suite:
      4 # * test_bufio - tests file buffering
      5 # * test_memoryio - tests BytesIO and StringIO
      6 # * test_fileio - tests FileIO
      7 # * test_file - tests the file interface
      8 # * test_io - tests everything else in the io module
      9 # * test_univnewlines - tests universal newline support
     10 # * test_largefile - tests operations on a file greater than 2**32 bytes
     11 #     (only enabled with -ulargefile)
     12 
     13 ################################################################################
     14 # ATTENTION TEST WRITERS!!!
     15 ################################################################################
     16 # When writing tests for io, it's important to test both the C and Python
     17 # implementations. This is usually done by writing a base test that refers to
     18 # the type it is testing as an attribute. Then it provides custom subclasses to
     19 # test both implementations. This file has lots of examples.
     20 ################################################################################
     21 
     22 from __future__ import print_function
     23 from __future__ import unicode_literals
     24 
     25 import os
     26 import sys
     27 import time
     28 import array
     29 import random
     30 import unittest
     31 import weakref
     32 import warnings
     33 import abc
     34 import signal
     35 import errno
     36 from itertools import cycle, count
     37 from collections import deque
     38 from UserList import UserList
     39 from test import test_support as support
     40 import contextlib
     41 
     42 import codecs
     43 import io  # C implementation of io
     44 import _pyio as pyio # Python implementation of io
     45 try:
     46     import threading
     47 except ImportError:
     48     threading = None
     49 try:
     50     import fcntl
     51 except ImportError:
     52     fcntl = None
     53 
     54 __metaclass__ = type
     55 bytes = support.py3k_bytes
     56 
     57 def byteslike(*pos, **kw):
     58     return memoryview(bytearray(*pos, **kw))
     59 
     60 def _default_chunk_size():
     61     """Get the default TextIOWrapper chunk size"""
     62     with io.open(__file__, "r", encoding="latin1") as f:
     63         return f._CHUNK_SIZE
     64 
     65 
     66 class MockRawIOWithoutRead:
     67     """A RawIO implementation without read(), so as to exercise the default
     68     RawIO.read() which calls readinto()."""
     69 
     70     def __init__(self, read_stack=()):
     71         self._read_stack = list(read_stack)
     72         self._write_stack = []
     73         self._reads = 0
     74         self._extraneous_reads = 0
     75 
     76     def write(self, b):
     77         self._write_stack.append(bytes(b))
     78         return len(b)
     79 
     80     def writable(self):
     81         return True
     82 
     83     def fileno(self):
     84         return 42
     85 
     86     def readable(self):
     87         return True
     88 
     89     def seekable(self):
     90         return True
     91 
     92     def seek(self, pos, whence):
     93         return 0   # wrong but we gotta return something
     94 
     95     def tell(self):
     96         return 0   # same comment as above
     97 
     98     def readinto(self, buf):
     99         self._reads += 1
    100         max_len = len(buf)
    101         try:
    102             data = self._read_stack[0]
    103         except IndexError:
    104             self._extraneous_reads += 1
    105             return 0
    106         if data is None:
    107             del self._read_stack[0]
    108             return None
    109         n = len(data)
    110         if len(data) <= max_len:
    111             del self._read_stack[0]
    112             buf[:n] = data
    113             return n
    114         else:
    115             buf[:] = data[:max_len]
    116             self._read_stack[0] = data[max_len:]
    117             return max_len
    118 
    119     def truncate(self, pos=None):
    120         return pos
    121 
    122 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
    123     pass
    124 
    125 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
    126     pass
    127 
    128 
    129 class MockRawIO(MockRawIOWithoutRead):
    130 
    131     def read(self, n=None):
    132         self._reads += 1
    133         try:
    134             return self._read_stack.pop(0)
    135         except:
    136             self._extraneous_reads += 1
    137             return b""
    138 
    139 class CMockRawIO(MockRawIO, io.RawIOBase):
    140     pass
    141 
    142 class PyMockRawIO(MockRawIO, pyio.RawIOBase):
    143     pass
    144 
    145 
    146 class MisbehavedRawIO(MockRawIO):
    147     def write(self, b):
    148         return MockRawIO.write(self, b) * 2
    149 
    150     def read(self, n=None):
    151         return MockRawIO.read(self, n) * 2
    152 
    153     def seek(self, pos, whence):
    154         return -123
    155 
    156     def tell(self):
    157         return -456
    158 
    159     def readinto(self, buf):
    160         MockRawIO.readinto(self, buf)
    161         return len(buf) * 5
    162 
    163 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
    164     pass
    165 
    166 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
    167     pass
    168 
    169 
    170 class CloseFailureIO(MockRawIO):
    171     closed = 0
    172 
    173     def close(self):
    174         if not self.closed:
    175             self.closed = 1
    176             raise IOError
    177 
    178 class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
    179     pass
    180 
    181 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
    182     pass
    183 
    184 
    185 class MockFileIO:
    186 
    187     def __init__(self, data):
    188         self.read_history = []
    189         super(MockFileIO, self).__init__(data)
    190 
    191     def read(self, n=None):
    192         res = super(MockFileIO, self).read(n)
    193         self.read_history.append(None if res is None else len(res))
    194         return res
    195 
    196     def readinto(self, b):
    197         res = super(MockFileIO, self).readinto(b)
    198         self.read_history.append(res)
    199         return res
    200 
    201 class CMockFileIO(MockFileIO, io.BytesIO):
    202     pass
    203 
    204 class PyMockFileIO(MockFileIO, pyio.BytesIO):
    205     pass
    206 
    207 
    208 class MockNonBlockWriterIO:
    209 
    210     def __init__(self):
    211         self._write_stack = []
    212         self._blocker_char = None
    213 
    214     def pop_written(self):
    215         s = b"".join(self._write_stack)
    216         self._write_stack[:] = []
    217         return s
    218 
    219     def block_on(self, char):
    220         """Block when a given char is encountered."""
    221         self._blocker_char = char
    222 
    223     def readable(self):
    224         return True
    225 
    226     def seekable(self):
    227         return True
    228 
    229     def writable(self):
    230         return True
    231 
    232     def write(self, b):
    233         b = bytes(b)
    234         n = -1
    235         if self._blocker_char:
    236             try:
    237                 n = b.index(self._blocker_char)
    238             except ValueError:
    239                 pass
    240             else:
    241                 if n > 0:
    242                     # write data up to the first blocker
    243                     self._write_stack.append(b[:n])
    244                     return n
    245                 else:
    246                     # cancel blocker and indicate would block
    247                     self._blocker_char = None
    248                     return None
    249         self._write_stack.append(b)
    250         return len(b)
    251 
    252 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
    253     BlockingIOError = io.BlockingIOError
    254 
    255 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
    256     BlockingIOError = pyio.BlockingIOError
    257 
    258 
    259 class IOTest(unittest.TestCase):
    260 
    261     def setUp(self):
    262         support.unlink(support.TESTFN)
    263 
    264     def tearDown(self):
    265         support.unlink(support.TESTFN)
    266 
    267     def write_ops(self, f):
    268         self.assertEqual(f.write(b"blah."), 5)
    269         f.truncate(0)
    270         self.assertEqual(f.tell(), 5)
    271         f.seek(0)
    272 
    273         self.assertEqual(f.write(b"blah."), 5)
    274         self.assertEqual(f.seek(0), 0)
    275         self.assertEqual(f.write(b"Hello."), 6)
    276         self.assertEqual(f.tell(), 6)
    277         self.assertEqual(f.seek(-1, 1), 5)
    278         self.assertEqual(f.tell(), 5)
    279         buffer = bytearray(b" world\n\n\n")
    280         self.assertEqual(f.write(buffer), 9)
    281         buffer[:] = b"*" * 9  # Overwrite our copy of the data
    282         self.assertEqual(f.seek(0), 0)
    283         self.assertEqual(f.write(b"h"), 1)
    284         self.assertEqual(f.seek(-1, 2), 13)
    285         self.assertEqual(f.tell(), 13)
    286 
    287         self.assertEqual(f.truncate(12), 12)
    288         self.assertEqual(f.tell(), 13)
    289         self.assertRaises(TypeError, f.seek, 0.0)
    290 
    291     def read_ops(self, f, buffered=False):
    292         data = f.read(5)
    293         self.assertEqual(data, b"hello")
    294         data = byteslike(data)
    295         self.assertEqual(f.readinto(data), 5)
    296         self.assertEqual(data.tobytes(), b" worl")
    297         data = bytearray(5)
    298         self.assertEqual(f.readinto(data), 2)
    299         self.assertEqual(len(data), 5)
    300         self.assertEqual(data[:2], b"d\n")
    301         self.assertEqual(f.seek(0), 0)
    302         self.assertEqual(f.read(20), b"hello world\n")
    303         self.assertEqual(f.read(1), b"")
    304         self.assertEqual(f.readinto(byteslike(b"x")), 0)
    305         self.assertEqual(f.seek(-6, 2), 6)
    306         self.assertEqual(f.read(5), b"world")
    307         self.assertEqual(f.read(0), b"")
    308         self.assertEqual(f.readinto(byteslike()), 0)
    309         self.assertEqual(f.seek(-6, 1), 5)
    310         self.assertEqual(f.read(5), b" worl")
    311         self.assertEqual(f.tell(), 10)
    312         self.assertRaises(TypeError, f.seek, 0.0)
    313         if buffered:
    314             f.seek(0)
    315             self.assertEqual(f.read(), b"hello world\n")
    316             f.seek(6)
    317             self.assertEqual(f.read(), b"world\n")
    318             self.assertEqual(f.read(), b"")
    319 
    320     LARGE = 2**31
    321 
    322     def large_file_ops(self, f):
    323         assert f.readable()
    324         assert f.writable()
    325         self.assertEqual(f.seek(self.LARGE), self.LARGE)
    326         self.assertEqual(f.tell(), self.LARGE)
    327         self.assertEqual(f.write(b"xxx"), 3)
    328         self.assertEqual(f.tell(), self.LARGE + 3)
    329         self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
    330         self.assertEqual(f.truncate(), self.LARGE + 2)
    331         self.assertEqual(f.tell(), self.LARGE + 2)
    332         self.assertEqual(f.seek(0, 2), self.LARGE + 2)
    333         self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
    334         self.assertEqual(f.tell(), self.LARGE + 2)
    335         self.assertEqual(f.seek(0, 2), self.LARGE + 1)
    336         self.assertEqual(f.seek(-1, 2), self.LARGE)
    337         self.assertEqual(f.read(2), b"x")
    338 
    339     def test_invalid_operations(self):
    340         # Try writing on a file opened in read mode and vice-versa.
    341         for mode in ("w", "wb"):
    342             with self.open(support.TESTFN, mode) as fp:
    343                 self.assertRaises(IOError, fp.read)
    344                 self.assertRaises(IOError, fp.readline)
    345         with self.open(support.TESTFN, "rb") as fp:
    346             self.assertRaises(IOError, fp.write, b"blah")
    347             self.assertRaises(IOError, fp.writelines, [b"blah\n"])
    348         with self.open(support.TESTFN, "r") as fp:
    349             self.assertRaises(IOError, fp.write, "blah")
    350             self.assertRaises(IOError, fp.writelines, ["blah\n"])
    351 
    352     def test_open_handles_NUL_chars(self):
    353         fn_with_NUL = 'foo\0bar'
    354         self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
    355 
    356         bytes_fn = fn_with_NUL.encode('ascii')
    357         with warnings.catch_warnings():
    358             warnings.simplefilter("ignore", DeprecationWarning)
    359             self.assertRaises(TypeError, self.open, bytes_fn, 'w')
    360 
    361     def test_raw_file_io(self):
    362         with self.open(support.TESTFN, "wb", buffering=0) as f:
    363             self.assertEqual(f.readable(), False)
    364             self.assertEqual(f.writable(), True)
    365             self.assertEqual(f.seekable(), True)
    366             self.write_ops(f)
    367         with self.open(support.TESTFN, "rb", buffering=0) as f:
    368             self.assertEqual(f.readable(), True)
    369             self.assertEqual(f.writable(), False)
    370             self.assertEqual(f.seekable(), True)
    371             self.read_ops(f)
    372 
    373     def test_buffered_file_io(self):
    374         with self.open(support.TESTFN, "wb") as f:
    375             self.assertEqual(f.readable(), False)
    376             self.assertEqual(f.writable(), True)
    377             self.assertEqual(f.seekable(), True)
    378             self.write_ops(f)
    379         with self.open(support.TESTFN, "rb") as f:
    380             self.assertEqual(f.readable(), True)
    381             self.assertEqual(f.writable(), False)
    382             self.assertEqual(f.seekable(), True)
    383             self.read_ops(f, True)
    384 
    385     def test_readline(self):
    386         with self.open(support.TESTFN, "wb") as f:
    387             f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
    388         with self.open(support.TESTFN, "rb") as f:
    389             self.assertEqual(f.readline(), b"abc\n")
    390             self.assertEqual(f.readline(10), b"def\n")
    391             self.assertEqual(f.readline(2), b"xy")
    392             self.assertEqual(f.readline(4), b"zzy\n")
    393             self.assertEqual(f.readline(), b"foo\x00bar\n")
    394             self.assertEqual(f.readline(None), b"another line")
    395             self.assertRaises(TypeError, f.readline, 5.3)
    396         with self.open(support.TESTFN, "r") as f:
    397             self.assertRaises(TypeError, f.readline, 5.3)
    398 
    399     def test_raw_bytes_io(self):
    400         f = self.BytesIO()
    401         self.write_ops(f)
    402         data = f.getvalue()
    403         self.assertEqual(data, b"hello world\n")
    404         f = self.BytesIO(data)
    405         self.read_ops(f, True)
    406 
    407     def test_large_file_ops(self):
    408         # On Windows and Mac OSX this test comsumes large resources; It takes
    409         # a long time to build the >2GB file and takes >2GB of disk space
    410         # therefore the resource must be enabled to run this test.
    411         if sys.platform[:3] == 'win' or sys.platform == 'darwin':
    412             support.requires(
    413                 'largefile',
    414                 'test requires %s bytes and a long time to run' % self.LARGE)
    415         with self.open(support.TESTFN, "w+b", 0) as f:
    416             self.large_file_ops(f)
    417         with self.open(support.TESTFN, "w+b") as f:
    418             self.large_file_ops(f)
    419 
    420     def test_with_open(self):
    421         for bufsize in (0, 1, 100):
    422             f = None
    423             with self.open(support.TESTFN, "wb", bufsize) as f:
    424                 f.write(b"xxx")
    425             self.assertEqual(f.closed, True)
    426             f = None
    427             try:
    428                 with self.open(support.TESTFN, "wb", bufsize) as f:
    429                     1 // 0
    430             except ZeroDivisionError:
    431                 self.assertEqual(f.closed, True)
    432             else:
    433                 self.fail("1 // 0 didn't raise an exception")
    434 
    435     # issue 5008
    436     def test_append_mode_tell(self):
    437         with self.open(support.TESTFN, "wb") as f:
    438             f.write(b"xxx")
    439         with self.open(support.TESTFN, "ab", buffering=0) as f:
    440             self.assertEqual(f.tell(), 3)
    441         with self.open(support.TESTFN, "ab") as f:
    442             self.assertEqual(f.tell(), 3)
    443         with self.open(support.TESTFN, "a") as f:
    444             self.assertGreater(f.tell(), 0)
    445 
    446     def test_destructor(self):
    447         record = []
    448         class MyFileIO(self.FileIO):
    449             def __del__(self):
    450                 record.append(1)
    451                 try:
    452                     f = super(MyFileIO, self).__del__
    453                 except AttributeError:
    454                     pass
    455                 else:
    456                     f()
    457             def close(self):
    458                 record.append(2)
    459                 super(MyFileIO, self).close()
    460             def flush(self):
    461                 record.append(3)
    462                 super(MyFileIO, self).flush()
    463         f = MyFileIO(support.TESTFN, "wb")
    464         f.write(b"xxx")
    465         del f
    466         support.gc_collect()
    467         self.assertEqual(record, [1, 2, 3])
    468         with self.open(support.TESTFN, "rb") as f:
    469             self.assertEqual(f.read(), b"xxx")
    470 
    471     def _check_base_destructor(self, base):
    472         record = []
    473         class MyIO(base):
    474             def __init__(self):
    475                 # This exercises the availability of attributes on object
    476                 # destruction.
    477                 # (in the C version, close() is called by the tp_dealloc
    478                 # function, not by __del__)
    479                 self.on_del = 1
    480                 self.on_close = 2
    481                 self.on_flush = 3
    482             def __del__(self):
    483                 record.append(self.on_del)
    484                 try:
    485                     f = super(MyIO, self).__del__
    486                 except AttributeError:
    487                     pass
    488                 else:
    489                     f()
    490             def close(self):
    491                 record.append(self.on_close)
    492                 super(MyIO, self).close()
    493             def flush(self):
    494                 record.append(self.on_flush)
    495                 super(MyIO, self).flush()
    496         f = MyIO()
    497         del f
    498         support.gc_collect()
    499         self.assertEqual(record, [1, 2, 3])
    500 
    501     def test_IOBase_destructor(self):
    502         self._check_base_destructor(self.IOBase)
    503 
    504     def test_RawIOBase_destructor(self):
    505         self._check_base_destructor(self.RawIOBase)
    506 
    507     def test_BufferedIOBase_destructor(self):
    508         self._check_base_destructor(self.BufferedIOBase)
    509 
    510     def test_TextIOBase_destructor(self):
    511         self._check_base_destructor(self.TextIOBase)
    512 
    513     def test_close_flushes(self):
    514         with self.open(support.TESTFN, "wb") as f:
    515             f.write(b"xxx")
    516         with self.open(support.TESTFN, "rb") as f:
    517             self.assertEqual(f.read(), b"xxx")
    518 
    519     def test_array_writes(self):
    520         a = array.array(b'i', range(10))
    521         n = len(a.tostring())
    522         with self.open(support.TESTFN, "wb", 0) as f:
    523             self.assertEqual(f.write(a), n)
    524         with self.open(support.TESTFN, "wb") as f:
    525             self.assertEqual(f.write(a), n)
    526 
    527     def test_closefd(self):
    528         self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
    529                           closefd=False)
    530 
    531     def test_read_closed(self):
    532         with self.open(support.TESTFN, "w") as f:
    533             f.write("egg\n")
    534         with self.open(support.TESTFN, "r") as f:
    535             file = self.open(f.fileno(), "r", closefd=False)
    536             self.assertEqual(file.read(), "egg\n")
    537             file.seek(0)
    538             file.close()
    539             self.assertRaises(ValueError, file.read)
    540 
    541     def test_no_closefd_with_filename(self):
    542         # can't use closefd in combination with a file name
    543         self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
    544 
    545     def test_closefd_attr(self):
    546         with self.open(support.TESTFN, "wb") as f:
    547             f.write(b"egg\n")
    548         with self.open(support.TESTFN, "r") as f:
    549             self.assertEqual(f.buffer.raw.closefd, True)
    550             file = self.open(f.fileno(), "r", closefd=False)
    551             self.assertEqual(file.buffer.raw.closefd, False)
    552 
    553     def test_garbage_collection(self):
    554         # FileIO objects are collected, and collecting them flushes
    555         # all data to disk.
    556         f = self.FileIO(support.TESTFN, "wb")
    557         f.write(b"abcxxx")
    558         f.f = f
    559         wr = weakref.ref(f)
    560         del f
    561         support.gc_collect()
    562         self.assertIsNone(wr(), wr)
    563         with self.open(support.TESTFN, "rb") as f:
    564             self.assertEqual(f.read(), b"abcxxx")
    565 
    566     def test_unbounded_file(self):
    567         # Issue #1174606: reading from an unbounded stream such as /dev/zero.
    568         zero = "/dev/zero"
    569         if not os.path.exists(zero):
    570             self.skipTest("{0} does not exist".format(zero))
    571         if sys.maxsize > 0x7FFFFFFF:
    572             self.skipTest("test can only run in a 32-bit address space")
    573         if support.real_max_memuse < support._2G:
    574             self.skipTest("test requires at least 2GB of memory")
    575         with self.open(zero, "rb", buffering=0) as f:
    576             self.assertRaises(OverflowError, f.read)
    577         with self.open(zero, "rb") as f:
    578             self.assertRaises(OverflowError, f.read)
    579         with self.open(zero, "r") as f:
    580             self.assertRaises(OverflowError, f.read)
    581 
    582     def check_flush_error_on_close(self, *args, **kwargs):
    583         # Test that the file is closed despite failed flush
    584         # and that flush() is called before file closed.
    585         f = self.open(*args, **kwargs)
    586         closed = []
    587         def bad_flush():
    588             closed[:] = [f.closed]
    589             raise IOError()
    590         f.flush = bad_flush
    591         self.assertRaises(IOError, f.close) # exception not swallowed
    592         self.assertTrue(f.closed)
    593         self.assertTrue(closed)      # flush() called
    594         self.assertFalse(closed[0])  # flush() called before file closed
    595         f.flush = lambda: None  # break reference loop
    596 
    597     def test_flush_error_on_close(self):
    598         # raw file
    599         # Issue #5700: io.FileIO calls flush() after file closed
    600         self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
    601         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    602         self.check_flush_error_on_close(fd, 'wb', buffering=0)
    603         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    604         self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
    605         os.close(fd)
    606         # buffered io
    607         self.check_flush_error_on_close(support.TESTFN, 'wb')
    608         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    609         self.check_flush_error_on_close(fd, 'wb')
    610         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    611         self.check_flush_error_on_close(fd, 'wb', closefd=False)
    612         os.close(fd)
    613         # text io
    614         self.check_flush_error_on_close(support.TESTFN, 'w')
    615         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    616         self.check_flush_error_on_close(fd, 'w')
    617         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    618         self.check_flush_error_on_close(fd, 'w', closefd=False)
    619         os.close(fd)
    620 
    621     def test_multi_close(self):
    622         f = self.open(support.TESTFN, "wb", buffering=0)
    623         f.close()
    624         f.close()
    625         f.close()
    626         self.assertRaises(ValueError, f.flush)
    627 
    628     def test_RawIOBase_read(self):
    629         # Exercise the default RawIOBase.read() implementation (which calls
    630         # readinto() internally).
    631         rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
    632         self.assertEqual(rawio.read(2), b"ab")
    633         self.assertEqual(rawio.read(2), b"c")
    634         self.assertEqual(rawio.read(2), b"d")
    635         self.assertEqual(rawio.read(2), None)
    636         self.assertEqual(rawio.read(2), b"ef")
    637         self.assertEqual(rawio.read(2), b"g")
    638         self.assertEqual(rawio.read(2), None)
    639         self.assertEqual(rawio.read(2), b"")
    640 
    641     def test_fileio_closefd(self):
    642         # Issue #4841
    643         with self.open(__file__, 'rb') as f1, \
    644              self.open(__file__, 'rb') as f2:
    645             fileio = self.FileIO(f1.fileno(), closefd=False)
    646             # .__init__() must not close f1
    647             fileio.__init__(f2.fileno(), closefd=False)
    648             f1.readline()
    649             # .close() must not close f2
    650             fileio.close()
    651             f2.readline()
    652 
    653     def test_nonbuffered_textio(self):
    654         with warnings.catch_warnings(record=True) as recorded:
    655             with self.assertRaises(ValueError):
    656                 self.open(support.TESTFN, 'w', buffering=0)
    657             support.gc_collect()
    658         self.assertEqual(recorded, [])
    659 
    660     def test_invalid_newline(self):
    661         with warnings.catch_warnings(record=True) as recorded:
    662             with self.assertRaises(ValueError):
    663                 self.open(support.TESTFN, 'w', newline='invalid')
    664             support.gc_collect()
    665         self.assertEqual(recorded, [])
    666 
    667     def test_buffered_readinto_mixin(self):
    668         # Test the implementation provided by BufferedIOBase
    669         class Stream(self.BufferedIOBase):
    670             def read(self, size):
    671                 return b"12345"
    672         stream = Stream()
    673         buffer = byteslike(5)
    674         self.assertEqual(stream.readinto(buffer), 5)
    675         self.assertEqual(buffer.tobytes(), b"12345")
    676 
    677 
    678 class CIOTest(IOTest):
    679 
    680     def test_IOBase_finalize(self):
    681         # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
    682         # class which inherits IOBase and an object of this class are caught
    683         # in a reference cycle and close() is already in the method cache.
    684         class MyIO(self.IOBase):
    685             def close(self):
    686                 pass
    687 
    688         # create an instance to populate the method cache
    689         MyIO()
    690         obj = MyIO()
    691         obj.obj = obj
    692         wr = weakref.ref(obj)
    693         del MyIO
    694         del obj
    695         support.gc_collect()
    696         self.assertIsNone(wr(), wr)
    697 
    698 class PyIOTest(IOTest):
    699     test_array_writes = unittest.skip(
    700         "len(array.array) returns number of elements rather than bytelength"
    701     )(IOTest.test_array_writes)
    702 
    703 
    704 class CommonBufferedTests:
    705     # Tests common to BufferedReader, BufferedWriter and BufferedRandom
    706 
    707     def test_detach(self):
    708         raw = self.MockRawIO()
    709         buf = self.tp(raw)
    710         self.assertIs(buf.detach(), raw)
    711         self.assertRaises(ValueError, buf.detach)
    712 
    713         repr(buf)  # Should still work
    714 
    715     def test_fileno(self):
    716         rawio = self.MockRawIO()
    717         bufio = self.tp(rawio)
    718 
    719         self.assertEqual(42, bufio.fileno())
    720 
    721     def test_invalid_args(self):
    722         rawio = self.MockRawIO()
    723         bufio = self.tp(rawio)
    724         # Invalid whence
    725         self.assertRaises(ValueError, bufio.seek, 0, -1)
    726         self.assertRaises(ValueError, bufio.seek, 0, 3)
    727 
    728     def test_override_destructor(self):
    729         tp = self.tp
    730         record = []
    731         class MyBufferedIO(tp):
    732             def __del__(self):
    733                 record.append(1)
    734                 try:
    735                     f = super(MyBufferedIO, self).__del__
    736                 except AttributeError:
    737                     pass
    738                 else:
    739                     f()
    740             def close(self):
    741                 record.append(2)
    742                 super(MyBufferedIO, self).close()
    743             def flush(self):
    744                 record.append(3)
    745                 super(MyBufferedIO, self).flush()
    746         rawio = self.MockRawIO()
    747         bufio = MyBufferedIO(rawio)
    748         writable = bufio.writable()
    749         del bufio
    750         support.gc_collect()
    751         if writable:
    752             self.assertEqual(record, [1, 2, 3])
    753         else:
    754             self.assertEqual(record, [1, 2])
    755 
    756     def test_context_manager(self):
    757         # Test usability as a context manager
    758         rawio = self.MockRawIO()
    759         bufio = self.tp(rawio)
    760         def _with():
    761             with bufio:
    762                 pass
    763         _with()
    764         # bufio should now be closed, and using it a second time should raise
    765         # a ValueError.
    766         self.assertRaises(ValueError, _with)
    767 
    768     def test_error_through_destructor(self):
    769         # Test that the exception state is not modified by a destructor,
    770         # even if close() fails.
    771         rawio = self.CloseFailureIO()
    772         def f():
    773             self.tp(rawio).xyzzy
    774         with support.captured_output("stderr") as s:
    775             self.assertRaises(AttributeError, f)
    776         s = s.getvalue().strip()
    777         if s:
    778             # The destructor *may* have printed an unraisable error, check it
    779             self.assertEqual(len(s.splitlines()), 1)
    780             self.assertTrue(s.startswith("Exception IOError: "), s)
    781             self.assertTrue(s.endswith(" ignored"), s)
    782 
    783     def test_repr(self):
    784         raw = self.MockRawIO()
    785         b = self.tp(raw)
    786         clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
    787         self.assertEqual(repr(b), "<%s>" % clsname)
    788         raw.name = "dummy"
    789         self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
    790         raw.name = b"dummy"
    791         self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
    792 
    793     def test_flush_error_on_close(self):
    794         # Test that buffered file is closed despite failed flush
    795         # and that flush() is called before file closed.
    796         raw = self.MockRawIO()
    797         closed = []
    798         def bad_flush():
    799             closed[:] = [b.closed, raw.closed]
    800             raise IOError()
    801         raw.flush = bad_flush
    802         b = self.tp(raw)
    803         self.assertRaises(IOError, b.close) # exception not swallowed
    804         self.assertTrue(b.closed)
    805         self.assertTrue(raw.closed)
    806         self.assertTrue(closed)      # flush() called
    807         self.assertFalse(closed[0])  # flush() called before file closed
    808         self.assertFalse(closed[1])
    809         raw.flush = lambda: None  # break reference loop
    810 
    811     def test_close_error_on_close(self):
    812         raw = self.MockRawIO()
    813         def bad_flush():
    814             raise IOError('flush')
    815         def bad_close():
    816             raise IOError('close')
    817         raw.close = bad_close
    818         b = self.tp(raw)
    819         b.flush = bad_flush
    820         with self.assertRaises(IOError) as err: # exception not swallowed
    821             b.close()
    822         self.assertEqual(err.exception.args, ('close',))
    823         self.assertFalse(b.closed)
    824 
    825     def test_multi_close(self):
    826         raw = self.MockRawIO()
    827         b = self.tp(raw)
    828         b.close()
    829         b.close()
    830         b.close()
    831         self.assertRaises(ValueError, b.flush)
    832 
    833     def test_readonly_attributes(self):
    834         raw = self.MockRawIO()
    835         buf = self.tp(raw)
    836         x = self.MockRawIO()
    837         with self.assertRaises((AttributeError, TypeError)):
    838             buf.raw = x
    839 
    840 
    841 class SizeofTest:
    842 
    843     @support.cpython_only
    844     def test_sizeof(self):
    845         bufsize1 = 4096
    846         bufsize2 = 8192
    847         rawio = self.MockRawIO()
    848         bufio = self.tp(rawio, buffer_size=bufsize1)
    849         size = sys.getsizeof(bufio) - bufsize1
    850         rawio = self.MockRawIO()
    851         bufio = self.tp(rawio, buffer_size=bufsize2)
    852         self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
    853 
    854 
    855 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
    856     read_mode = "rb"
    857 
    858     def test_constructor(self):
    859         rawio = self.MockRawIO([b"abc"])
    860         bufio = self.tp(rawio)
    861         bufio.__init__(rawio)
    862         bufio.__init__(rawio, buffer_size=1024)
    863         bufio.__init__(rawio, buffer_size=16)
    864         self.assertEqual(b"abc", bufio.read())
    865         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
    866         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
    867         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
    868         rawio = self.MockRawIO([b"abc"])
    869         bufio.__init__(rawio)
    870         self.assertEqual(b"abc", bufio.read())
    871 
    872     def test_uninitialized(self):
    873         bufio = self.tp.__new__(self.tp)
    874         del bufio
    875         bufio = self.tp.__new__(self.tp)
    876         self.assertRaisesRegexp((ValueError, AttributeError),
    877                                 'uninitialized|has no attribute',
    878                                 bufio.read, 0)
    879         bufio.__init__(self.MockRawIO())
    880         self.assertEqual(bufio.read(0), b'')
    881 
    882     def test_read(self):
    883         for arg in (None, 7):
    884             rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    885             bufio = self.tp(rawio)
    886             self.assertEqual(b"abcdefg", bufio.read(arg))
    887         # Invalid args
    888         self.assertRaises(ValueError, bufio.read, -2)
    889 
    890     def test_read1(self):
    891         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    892         bufio = self.tp(rawio)
    893         self.assertEqual(b"a", bufio.read(1))
    894         self.assertEqual(b"b", bufio.read1(1))
    895         self.assertEqual(rawio._reads, 1)
    896         self.assertEqual(b"c", bufio.read1(100))
    897         self.assertEqual(rawio._reads, 1)
    898         self.assertEqual(b"d", bufio.read1(100))
    899         self.assertEqual(rawio._reads, 2)
    900         self.assertEqual(b"efg", bufio.read1(100))
    901         self.assertEqual(rawio._reads, 3)
    902         self.assertEqual(b"", bufio.read1(100))
    903         self.assertEqual(rawio._reads, 4)
    904         # Invalid args
    905         self.assertRaises(ValueError, bufio.read1, -1)
    906 
    907     def test_readinto(self):
    908         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    909         bufio = self.tp(rawio)
    910         b = bytearray(2)
    911         self.assertEqual(bufio.readinto(b), 2)
    912         self.assertEqual(b, b"ab")
    913         self.assertEqual(bufio.readinto(b), 2)
    914         self.assertEqual(b, b"cd")
    915         self.assertEqual(bufio.readinto(b), 2)
    916         self.assertEqual(b, b"ef")
    917         self.assertEqual(bufio.readinto(b), 1)
    918         self.assertEqual(b, b"gf")
    919         self.assertEqual(bufio.readinto(b), 0)
    920         self.assertEqual(b, b"gf")
    921 
    922     def test_readlines(self):
    923         def bufio():
    924             rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
    925             return self.tp(rawio)
    926         self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
    927         self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
    928         self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
    929 
    930     def test_buffering(self):
    931         data = b"abcdefghi"
    932         dlen = len(data)
    933 
    934         tests = [
    935             [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
    936             [ 100, [ 3, 3, 3],     [ dlen ]    ],
    937             [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
    938         ]
    939 
    940         for bufsize, buf_read_sizes, raw_read_sizes in tests:
    941             rawio = self.MockFileIO(data)
    942             bufio = self.tp(rawio, buffer_size=bufsize)
    943             pos = 0
    944             for nbytes in buf_read_sizes:
    945                 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
    946                 pos += nbytes
    947             # this is mildly implementation-dependent
    948             self.assertEqual(rawio.read_history, raw_read_sizes)
    949 
    950     def test_read_non_blocking(self):
    951         # Inject some None's in there to simulate EWOULDBLOCK
    952         rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
    953         bufio = self.tp(rawio)
    954         self.assertEqual(b"abcd", bufio.read(6))
    955         self.assertEqual(b"e", bufio.read(1))
    956         self.assertEqual(b"fg", bufio.read())
    957         self.assertEqual(b"", bufio.peek(1))
    958         self.assertIsNone(bufio.read())
    959         self.assertEqual(b"", bufio.read())
    960 
    961         rawio = self.MockRawIO((b"a", None, None))
    962         self.assertEqual(b"a", rawio.readall())
    963         self.assertIsNone(rawio.readall())
    964 
    965     def test_read_past_eof(self):
    966         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    967         bufio = self.tp(rawio)
    968 
    969         self.assertEqual(b"abcdefg", bufio.read(9000))
    970 
    971     def test_read_all(self):
    972         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    973         bufio = self.tp(rawio)
    974 
    975         self.assertEqual(b"abcdefg", bufio.read())
    976 
    977     @unittest.skipUnless(threading, 'Threading required for this test.')
    978     @support.requires_resource('cpu')
    979     def test_threads(self):
    980         try:
    981             # Write out many bytes with exactly the same number of 0's,
    982             # 1's... 255's. This will help us check that concurrent reading
    983             # doesn't duplicate or forget contents.
    984             N = 1000
    985             l = list(range(256)) * N
    986             random.shuffle(l)
    987             s = bytes(bytearray(l))
    988             with self.open(support.TESTFN, "wb") as f:
    989                 f.write(s)
    990             with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
    991                 bufio = self.tp(raw, 8)
    992                 errors = []
    993                 results = []
    994                 def f():
    995                     try:
    996                         # Intra-buffer read then buffer-flushing read
    997                         for n in cycle([1, 19]):
    998                             s = bufio.read(n)
    999                             if not s:
   1000                                 break
   1001                             # list.append() is atomic
   1002                             results.append(s)
   1003                     except Exception as e:
   1004                         errors.append(e)
   1005                         raise
   1006                 threads = [threading.Thread(target=f) for x in range(20)]
   1007                 with support.start_threads(threads):
   1008                     time.sleep(0.02) # yield
   1009                 self.assertFalse(errors,
   1010                     "the following exceptions were caught: %r" % errors)
   1011                 s = b''.join(results)
   1012                 for i in range(256):
   1013                     c = bytes(bytearray([i]))
   1014                     self.assertEqual(s.count(c), N)
   1015         finally:
   1016             support.unlink(support.TESTFN)
   1017 
   1018     def test_misbehaved_io(self):
   1019         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
   1020         bufio = self.tp(rawio)
   1021         self.assertRaises(IOError, bufio.seek, 0)
   1022         self.assertRaises(IOError, bufio.tell)
   1023 
   1024     def test_no_extraneous_read(self):
   1025         # Issue #9550; when the raw IO object has satisfied the read request,
   1026         # we should not issue any additional reads, otherwise it may block
   1027         # (e.g. socket).
   1028         bufsize = 16
   1029         for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
   1030             rawio = self.MockRawIO([b"x" * n])
   1031             bufio = self.tp(rawio, bufsize)
   1032             self.assertEqual(bufio.read(n), b"x" * n)
   1033             # Simple case: one raw read is enough to satisfy the request.
   1034             self.assertEqual(rawio._extraneous_reads, 0,
   1035                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
   1036             # A more complex case where two raw reads are needed to satisfy
   1037             # the request.
   1038             rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
   1039             bufio = self.tp(rawio, bufsize)
   1040             self.assertEqual(bufio.read(n), b"x" * n)
   1041             self.assertEqual(rawio._extraneous_reads, 0,
   1042                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
   1043 
   1044 
   1045 class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
   1046     tp = io.BufferedReader
   1047 
   1048     def test_constructor(self):
   1049         BufferedReaderTest.test_constructor(self)
   1050         # The allocation can succeed on 32-bit builds, e.g. with more
   1051         # than 2GB RAM and a 64-bit kernel.
   1052         if sys.maxsize > 0x7FFFFFFF:
   1053             rawio = self.MockRawIO()
   1054             bufio = self.tp(rawio)
   1055             self.assertRaises((OverflowError, MemoryError, ValueError),
   1056                 bufio.__init__, rawio, sys.maxsize)
   1057 
   1058     def test_initialization(self):
   1059         rawio = self.MockRawIO([b"abc"])
   1060         bufio = self.tp(rawio)
   1061         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1062         self.assertRaises(ValueError, bufio.read)
   1063         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1064         self.assertRaises(ValueError, bufio.read)
   1065         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1066         self.assertRaises(ValueError, bufio.read)
   1067 
   1068     def test_misbehaved_io_read(self):
   1069         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
   1070         bufio = self.tp(rawio)
   1071         # _pyio.BufferedReader seems to implement reading different, so that
   1072         # checking this is not so easy.
   1073         self.assertRaises(IOError, bufio.read, 10)
   1074 
   1075     def test_garbage_collection(self):
   1076         # C BufferedReader objects are collected.
   1077         # The Python version has __del__, so it ends into gc.garbage instead
   1078         rawio = self.FileIO(support.TESTFN, "w+b")
   1079         f = self.tp(rawio)
   1080         f.f = f
   1081         wr = weakref.ref(f)
   1082         del f
   1083         support.gc_collect()
   1084         self.assertIsNone(wr(), wr)
   1085 
   1086     def test_args_error(self):
   1087         # Issue #17275
   1088         with self.assertRaisesRegexp(TypeError, "BufferedReader"):
   1089             self.tp(io.BytesIO(), 1024, 1024, 1024)
   1090 
   1091 
   1092 class PyBufferedReaderTest(BufferedReaderTest):
   1093     tp = pyio.BufferedReader
   1094 
   1095 
   1096 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
   1097     write_mode = "wb"
   1098 
   1099     def test_constructor(self):
   1100         rawio = self.MockRawIO()
   1101         bufio = self.tp(rawio)
   1102         bufio.__init__(rawio)
   1103         bufio.__init__(rawio, buffer_size=1024)
   1104         bufio.__init__(rawio, buffer_size=16)
   1105         self.assertEqual(3, bufio.write(b"abc"))
   1106         bufio.flush()
   1107         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1108         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1109         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1110         bufio.__init__(rawio)
   1111         self.assertEqual(3, bufio.write(b"ghi"))
   1112         bufio.flush()
   1113         self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
   1114 
   1115     def test_uninitialized(self):
   1116         bufio = self.tp.__new__(self.tp)
   1117         del bufio
   1118         bufio = self.tp.__new__(self.tp)
   1119         self.assertRaisesRegexp((ValueError, AttributeError),
   1120                                 'uninitialized|has no attribute',
   1121                                 bufio.write, b'')
   1122         bufio.__init__(self.MockRawIO())
   1123         self.assertEqual(bufio.write(b''), 0)
   1124 
   1125     def test_detach_flush(self):
   1126         raw = self.MockRawIO()
   1127         buf = self.tp(raw)
   1128         buf.write(b"howdy!")
   1129         self.assertFalse(raw._write_stack)
   1130         buf.detach()
   1131         self.assertEqual(raw._write_stack, [b"howdy!"])
   1132 
   1133     def test_write(self):
   1134         # Write to the buffered IO but don't overflow the buffer.
   1135         writer = self.MockRawIO()
   1136         bufio = self.tp(writer, 8)
   1137         bufio.write(b"abc")
   1138         self.assertFalse(writer._write_stack)
   1139         buffer = bytearray(b"def")
   1140         bufio.write(buffer)
   1141         buffer[:] = b"***"  # Overwrite our copy of the data
   1142         bufio.flush()
   1143         self.assertEqual(b"".join(writer._write_stack), b"abcdef")
   1144 
   1145     def test_write_overflow(self):
   1146         writer = self.MockRawIO()
   1147         bufio = self.tp(writer, 8)
   1148         contents = b"abcdefghijklmnop"
   1149         for n in range(0, len(contents), 3):
   1150             bufio.write(contents[n:n+3])
   1151         flushed = b"".join(writer._write_stack)
   1152         # At least (total - 8) bytes were implicitly flushed, perhaps more
   1153         # depending on the implementation.
   1154         self.assertTrue(flushed.startswith(contents[:-8]), flushed)
   1155 
   1156     def check_writes(self, intermediate_func):
   1157         # Lots of writes, test the flushed output is as expected.
   1158         contents = bytes(range(256)) * 1000
   1159         n = 0
   1160         writer = self.MockRawIO()
   1161         bufio = self.tp(writer, 13)
   1162         # Generator of write sizes: repeat each N 15 times then proceed to N+1
   1163         def gen_sizes():
   1164             for size in count(1):
   1165                 for i in range(15):
   1166                     yield size
   1167         sizes = gen_sizes()
   1168         while n < len(contents):
   1169             size = min(next(sizes), len(contents) - n)
   1170             self.assertEqual(bufio.write(contents[n:n+size]), size)
   1171             intermediate_func(bufio)
   1172             n += size
   1173         bufio.flush()
   1174         self.assertEqual(contents,
   1175             b"".join(writer._write_stack))
   1176 
   1177     def test_writes(self):
   1178         self.check_writes(lambda bufio: None)
   1179 
   1180     def test_writes_and_flushes(self):
   1181         self.check_writes(lambda bufio: bufio.flush())
   1182 
   1183     def test_writes_and_seeks(self):
   1184         def _seekabs(bufio):
   1185             pos = bufio.tell()
   1186             bufio.seek(pos + 1, 0)
   1187             bufio.seek(pos - 1, 0)
   1188             bufio.seek(pos, 0)
   1189         self.check_writes(_seekabs)
   1190         def _seekrel(bufio):
   1191             pos = bufio.seek(0, 1)
   1192             bufio.seek(+1, 1)
   1193             bufio.seek(-1, 1)
   1194             bufio.seek(pos, 0)
   1195         self.check_writes(_seekrel)
   1196 
   1197     def test_writes_and_truncates(self):
   1198         self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
   1199 
   1200     def test_write_non_blocking(self):
   1201         raw = self.MockNonBlockWriterIO()
   1202         bufio = self.tp(raw, 8)
   1203 
   1204         self.assertEqual(bufio.write(b"abcd"), 4)
   1205         self.assertEqual(bufio.write(b"efghi"), 5)
   1206         # 1 byte will be written, the rest will be buffered
   1207         raw.block_on(b"k")
   1208         self.assertEqual(bufio.write(b"jklmn"), 5)
   1209 
   1210         # 8 bytes will be written, 8 will be buffered and the rest will be lost
   1211         raw.block_on(b"0")
   1212         try:
   1213             bufio.write(b"opqrwxyz0123456789")
   1214         except self.BlockingIOError as e:
   1215             written = e.characters_written
   1216         else:
   1217             self.fail("BlockingIOError should have been raised")
   1218         self.assertEqual(written, 16)
   1219         self.assertEqual(raw.pop_written(),
   1220             b"abcdefghijklmnopqrwxyz")
   1221 
   1222         self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
   1223         s = raw.pop_written()
   1224         # Previously buffered bytes were flushed
   1225         self.assertTrue(s.startswith(b"01234567A"), s)
   1226 
   1227     def test_write_and_rewind(self):
   1228         raw = io.BytesIO()
   1229         bufio = self.tp(raw, 4)
   1230         self.assertEqual(bufio.write(b"abcdef"), 6)
   1231         self.assertEqual(bufio.tell(), 6)
   1232         bufio.seek(0, 0)
   1233         self.assertEqual(bufio.write(b"XY"), 2)
   1234         bufio.seek(6, 0)
   1235         self.assertEqual(raw.getvalue(), b"XYcdef")
   1236         self.assertEqual(bufio.write(b"123456"), 6)
   1237         bufio.flush()
   1238         self.assertEqual(raw.getvalue(), b"XYcdef123456")
   1239 
   1240     def test_flush(self):
   1241         writer = self.MockRawIO()
   1242         bufio = self.tp(writer, 8)
   1243         bufio.write(b"abc")
   1244         bufio.flush()
   1245         self.assertEqual(b"abc", writer._write_stack[0])
   1246 
   1247     def test_writelines(self):
   1248         l = [b'ab', b'cd', b'ef']
   1249         writer = self.MockRawIO()
   1250         bufio = self.tp(writer, 8)
   1251         bufio.writelines(l)
   1252         bufio.flush()
   1253         self.assertEqual(b''.join(writer._write_stack), b'abcdef')
   1254 
   1255     def test_writelines_userlist(self):
   1256         l = UserList([b'ab', b'cd', b'ef'])
   1257         writer = self.MockRawIO()
   1258         bufio = self.tp(writer, 8)
   1259         bufio.writelines(l)
   1260         bufio.flush()
   1261         self.assertEqual(b''.join(writer._write_stack), b'abcdef')
   1262 
   1263     def test_writelines_error(self):
   1264         writer = self.MockRawIO()
   1265         bufio = self.tp(writer, 8)
   1266         self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
   1267         self.assertRaises(TypeError, bufio.writelines, None)
   1268 
   1269     def test_destructor(self):
   1270         writer = self.MockRawIO()
   1271         bufio = self.tp(writer, 8)
   1272         bufio.write(b"abc")
   1273         del bufio
   1274         support.gc_collect()
   1275         self.assertEqual(b"abc", writer._write_stack[0])
   1276 
   1277     def test_truncate(self):
   1278         # Truncate implicitly flushes the buffer.
   1279         with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
   1280             bufio = self.tp(raw, 8)
   1281             bufio.write(b"abcdef")
   1282             self.assertEqual(bufio.truncate(3), 3)
   1283             self.assertEqual(bufio.tell(), 6)
   1284         with self.open(support.TESTFN, "rb", buffering=0) as f:
   1285             self.assertEqual(f.read(), b"abc")
   1286 
   1287     @unittest.skipUnless(threading, 'Threading required for this test.')
   1288     @support.requires_resource('cpu')
   1289     def test_threads(self):
   1290         try:
   1291             # Write out many bytes from many threads and test they were
   1292             # all flushed.
   1293             N = 1000
   1294             contents = bytes(range(256)) * N
   1295             sizes = cycle([1, 19])
   1296             n = 0
   1297             queue = deque()
   1298             while n < len(contents):
   1299                 size = next(sizes)
   1300                 queue.append(contents[n:n+size])
   1301                 n += size
   1302             del contents
   1303             # We use a real file object because it allows us to
   1304             # exercise situations where the GIL is released before
   1305             # writing the buffer to the raw streams. This is in addition
   1306             # to concurrency issues due to switching threads in the middle
   1307             # of Python code.
   1308             with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
   1309                 bufio = self.tp(raw, 8)
   1310                 errors = []
   1311                 def f():
   1312                     try:
   1313                         while True:
   1314                             try:
   1315                                 s = queue.popleft()
   1316                             except IndexError:
   1317                                 return
   1318                             bufio.write(s)
   1319                     except Exception as e:
   1320                         errors.append(e)
   1321                         raise
   1322                 threads = [threading.Thread(target=f) for x in range(20)]
   1323                 with support.start_threads(threads):
   1324                     time.sleep(0.02) # yield
   1325                 self.assertFalse(errors,
   1326                     "the following exceptions were caught: %r" % errors)
   1327                 bufio.close()
   1328             with self.open(support.TESTFN, "rb") as f:
   1329                 s = f.read()
   1330             for i in range(256):
   1331                 self.assertEqual(s.count(bytes([i])), N)
   1332         finally:
   1333             support.unlink(support.TESTFN)
   1334 
   1335     def test_misbehaved_io(self):
   1336         rawio = self.MisbehavedRawIO()
   1337         bufio = self.tp(rawio, 5)
   1338         self.assertRaises(IOError, bufio.seek, 0)
   1339         self.assertRaises(IOError, bufio.tell)
   1340         self.assertRaises(IOError, bufio.write, b"abcdef")
   1341 
   1342     def test_max_buffer_size_deprecation(self):
   1343         with support.check_warnings(("max_buffer_size is deprecated",
   1344                                      DeprecationWarning)):
   1345             self.tp(self.MockRawIO(), 8, 12)
   1346 
   1347     def test_write_error_on_close(self):
   1348         raw = self.MockRawIO()
   1349         def bad_write(b):
   1350             raise IOError()
   1351         raw.write = bad_write
   1352         b = self.tp(raw)
   1353         b.write(b'spam')
   1354         self.assertRaises(IOError, b.close) # exception not swallowed
   1355         self.assertTrue(b.closed)
   1356 
   1357 
   1358 class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
   1359     tp = io.BufferedWriter
   1360 
   1361     def test_constructor(self):
   1362         BufferedWriterTest.test_constructor(self)
   1363         # The allocation can succeed on 32-bit builds, e.g. with more
   1364         # than 2GB RAM and a 64-bit kernel.
   1365         if sys.maxsize > 0x7FFFFFFF:
   1366             rawio = self.MockRawIO()
   1367             bufio = self.tp(rawio)
   1368             self.assertRaises((OverflowError, MemoryError, ValueError),
   1369                 bufio.__init__, rawio, sys.maxsize)
   1370 
   1371     def test_initialization(self):
   1372         rawio = self.MockRawIO()
   1373         bufio = self.tp(rawio)
   1374         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1375         self.assertRaises(ValueError, bufio.write, b"def")
   1376         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1377         self.assertRaises(ValueError, bufio.write, b"def")
   1378         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1379         self.assertRaises(ValueError, bufio.write, b"def")
   1380 
   1381     def test_garbage_collection(self):
   1382         # C BufferedWriter objects are collected, and collecting them flushes
   1383         # all data to disk.
   1384         # The Python version has __del__, so it ends into gc.garbage instead
   1385         rawio = self.FileIO(support.TESTFN, "w+b")
   1386         f = self.tp(rawio)
   1387         f.write(b"123xxx")
   1388         f.x = f
   1389         wr = weakref.ref(f)
   1390         del f
   1391         support.gc_collect()
   1392         self.assertIsNone(wr(), wr)
   1393         with self.open(support.TESTFN, "rb") as f:
   1394             self.assertEqual(f.read(), b"123xxx")
   1395 
   1396     def test_args_error(self):
   1397         # Issue #17275
   1398         with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
   1399             self.tp(io.BytesIO(), 1024, 1024, 1024)
   1400 
   1401 
   1402 class PyBufferedWriterTest(BufferedWriterTest):
   1403     tp = pyio.BufferedWriter
   1404 
   1405 class BufferedRWPairTest(unittest.TestCase):
   1406 
   1407     def test_constructor(self):
   1408         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1409         self.assertFalse(pair.closed)
   1410 
   1411     def test_uninitialized(self):
   1412         pair = self.tp.__new__(self.tp)
   1413         del pair
   1414         pair = self.tp.__new__(self.tp)
   1415         self.assertRaisesRegexp((ValueError, AttributeError),
   1416                                 'uninitialized|has no attribute',
   1417                                 pair.read, 0)
   1418         self.assertRaisesRegexp((ValueError, AttributeError),
   1419                                 'uninitialized|has no attribute',
   1420                                 pair.write, b'')
   1421         pair.__init__(self.MockRawIO(), self.MockRawIO())
   1422         self.assertEqual(pair.read(0), b'')
   1423         self.assertEqual(pair.write(b''), 0)
   1424 
   1425     def test_detach(self):
   1426         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1427         self.assertRaises(self.UnsupportedOperation, pair.detach)
   1428 
   1429     def test_constructor_max_buffer_size_deprecation(self):
   1430         with support.check_warnings(("max_buffer_size is deprecated",
   1431                                      DeprecationWarning)):
   1432             self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
   1433 
   1434     def test_constructor_with_not_readable(self):
   1435         class NotReadable(MockRawIO):
   1436             def readable(self):
   1437                 return False
   1438 
   1439         self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
   1440 
   1441     def test_constructor_with_not_writeable(self):
   1442         class NotWriteable(MockRawIO):
   1443             def writable(self):
   1444                 return False
   1445 
   1446         self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
   1447 
   1448     def test_read(self):
   1449         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1450 
   1451         self.assertEqual(pair.read(3), b"abc")
   1452         self.assertEqual(pair.read(1), b"d")
   1453         self.assertEqual(pair.read(), b"ef")
   1454         pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
   1455         self.assertEqual(pair.read(None), b"abc")
   1456 
   1457     def test_readlines(self):
   1458         pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
   1459         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
   1460         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
   1461         self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
   1462 
   1463     def test_read1(self):
   1464         # .read1() is delegated to the underlying reader object, so this test
   1465         # can be shallow.
   1466         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1467 
   1468         self.assertEqual(pair.read1(3), b"abc")
   1469 
   1470     def test_readinto(self):
   1471         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1472 
   1473         data = byteslike(5)
   1474         self.assertEqual(pair.readinto(data), 5)
   1475         self.assertEqual(data.tobytes(), b"abcde")
   1476 
   1477     def test_write(self):
   1478         w = self.MockRawIO()
   1479         pair = self.tp(self.MockRawIO(), w)
   1480 
   1481         pair.write(b"abc")
   1482         pair.flush()
   1483         buffer = bytearray(b"def")
   1484         pair.write(buffer)
   1485         buffer[:] = b"***"  # Overwrite our copy of the data
   1486         pair.flush()
   1487         self.assertEqual(w._write_stack, [b"abc", b"def"])
   1488 
   1489     def test_peek(self):
   1490         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1491 
   1492         self.assertTrue(pair.peek(3).startswith(b"abc"))
   1493         self.assertEqual(pair.read(3), b"abc")
   1494 
   1495     def test_readable(self):
   1496         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1497         self.assertTrue(pair.readable())
   1498 
   1499     def test_writeable(self):
   1500         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1501         self.assertTrue(pair.writable())
   1502 
   1503     def test_seekable(self):
   1504         # BufferedRWPairs are never seekable, even if their readers and writers
   1505         # are.
   1506         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1507         self.assertFalse(pair.seekable())
   1508 
   1509     # .flush() is delegated to the underlying writer object and has been
   1510     # tested in the test_write method.
   1511 
   1512     def test_close_and_closed(self):
   1513         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1514         self.assertFalse(pair.closed)
   1515         pair.close()
   1516         self.assertTrue(pair.closed)
   1517 
   1518     def test_reader_close_error_on_close(self):
   1519         def reader_close():
   1520             reader_non_existing
   1521         reader = self.MockRawIO()
   1522         reader.close = reader_close
   1523         writer = self.MockRawIO()
   1524         pair = self.tp(reader, writer)
   1525         with self.assertRaises(NameError) as err:
   1526             pair.close()
   1527         self.assertIn('reader_non_existing', str(err.exception))
   1528         self.assertTrue(pair.closed)
   1529         self.assertFalse(reader.closed)
   1530         self.assertTrue(writer.closed)
   1531 
   1532     def test_writer_close_error_on_close(self):
   1533         def writer_close():
   1534             writer_non_existing
   1535         reader = self.MockRawIO()
   1536         writer = self.MockRawIO()
   1537         writer.close = writer_close
   1538         pair = self.tp(reader, writer)
   1539         with self.assertRaises(NameError) as err:
   1540             pair.close()
   1541         self.assertIn('writer_non_existing', str(err.exception))
   1542         self.assertFalse(pair.closed)
   1543         self.assertTrue(reader.closed)
   1544         self.assertFalse(writer.closed)
   1545 
   1546     def test_reader_writer_close_error_on_close(self):
   1547         def reader_close():
   1548             reader_non_existing
   1549         def writer_close():
   1550             writer_non_existing
   1551         reader = self.MockRawIO()
   1552         reader.close = reader_close
   1553         writer = self.MockRawIO()
   1554         writer.close = writer_close
   1555         pair = self.tp(reader, writer)
   1556         with self.assertRaises(NameError) as err:
   1557             pair.close()
   1558         self.assertIn('reader_non_existing', str(err.exception))
   1559         self.assertFalse(pair.closed)
   1560         self.assertFalse(reader.closed)
   1561         self.assertFalse(writer.closed)
   1562 
   1563     def test_isatty(self):
   1564         class SelectableIsAtty(MockRawIO):
   1565             def __init__(self, isatty):
   1566                 MockRawIO.__init__(self)
   1567                 self._isatty = isatty
   1568 
   1569             def isatty(self):
   1570                 return self._isatty
   1571 
   1572         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
   1573         self.assertFalse(pair.isatty())
   1574 
   1575         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
   1576         self.assertTrue(pair.isatty())
   1577 
   1578         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
   1579         self.assertTrue(pair.isatty())
   1580 
   1581         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
   1582         self.assertTrue(pair.isatty())
   1583 
   1584     def test_weakref_clearing(self):
   1585         brw = self.tp(self.MockRawIO(), self.MockRawIO())
   1586         ref = weakref.ref(brw)
   1587         brw = None
   1588         ref = None # Shouldn't segfault.
   1589 
   1590 class CBufferedRWPairTest(BufferedRWPairTest):
   1591     tp = io.BufferedRWPair
   1592 
   1593 class PyBufferedRWPairTest(BufferedRWPairTest):
   1594     tp = pyio.BufferedRWPair
   1595 
   1596 
   1597 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
   1598     read_mode = "rb+"
   1599     write_mode = "wb+"
   1600 
   1601     def test_constructor(self):
   1602         BufferedReaderTest.test_constructor(self)
   1603         BufferedWriterTest.test_constructor(self)
   1604 
   1605     def test_uninitialized(self):
   1606         BufferedReaderTest.test_uninitialized(self)
   1607         BufferedWriterTest.test_uninitialized(self)
   1608 
   1609     def test_read_and_write(self):
   1610         raw = self.MockRawIO((b"asdf", b"ghjk"))
   1611         rw = self.tp(raw, 8)
   1612 
   1613         self.assertEqual(b"as", rw.read(2))
   1614         rw.write(b"ddd")
   1615         rw.write(b"eee")
   1616         self.assertFalse(raw._write_stack) # Buffer writes
   1617         self.assertEqual(b"ghjk", rw.read())
   1618         self.assertEqual(b"dddeee", raw._write_stack[0])
   1619 
   1620     def test_seek_and_tell(self):
   1621         raw = self.BytesIO(b"asdfghjkl")
   1622         rw = self.tp(raw)
   1623 
   1624         self.assertEqual(b"as", rw.read(2))
   1625         self.assertEqual(2, rw.tell())
   1626         rw.seek(0, 0)
   1627         self.assertEqual(b"asdf", rw.read(4))
   1628 
   1629         rw.write(b"123f")
   1630         rw.seek(0, 0)
   1631         self.assertEqual(b"asdf123fl", rw.read())
   1632         self.assertEqual(9, rw.tell())
   1633         rw.seek(-4, 2)
   1634         self.assertEqual(5, rw.tell())
   1635         rw.seek(2, 1)
   1636         self.assertEqual(7, rw.tell())
   1637         self.assertEqual(b"fl", rw.read(11))
   1638         rw.flush()
   1639         self.assertEqual(b"asdf123fl", raw.getvalue())
   1640 
   1641         self.assertRaises(TypeError, rw.seek, 0.0)
   1642 
   1643     def check_flush_and_read(self, read_func):
   1644         raw = self.BytesIO(b"abcdefghi")
   1645         bufio = self.tp(raw)
   1646 
   1647         self.assertEqual(b"ab", read_func(bufio, 2))
   1648         bufio.write(b"12")
   1649         self.assertEqual(b"ef", read_func(bufio, 2))
   1650         self.assertEqual(6, bufio.tell())
   1651         bufio.flush()
   1652         self.assertEqual(6, bufio.tell())
   1653         self.assertEqual(b"ghi", read_func(bufio))
   1654         raw.seek(0, 0)
   1655         raw.write(b"XYZ")
   1656         # flush() resets the read buffer
   1657         bufio.flush()
   1658         bufio.seek(0, 0)
   1659         self.assertEqual(b"XYZ", read_func(bufio, 3))
   1660 
   1661     def test_flush_and_read(self):
   1662         self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
   1663 
   1664     def test_flush_and_readinto(self):
   1665         def _readinto(bufio, n=-1):
   1666             b = bytearray(n if n >= 0 else 9999)
   1667             n = bufio.readinto(b)
   1668             return bytes(b[:n])
   1669         self.check_flush_and_read(_readinto)
   1670 
   1671     def test_flush_and_peek(self):
   1672         def _peek(bufio, n=-1):
   1673             # This relies on the fact that the buffer can contain the whole
   1674             # raw stream, otherwise peek() can return less.
   1675             b = bufio.peek(n)
   1676             if n != -1:
   1677                 b = b[:n]
   1678             bufio.seek(len(b), 1)
   1679             return b
   1680         self.check_flush_and_read(_peek)
   1681 
   1682     def test_flush_and_write(self):
   1683         raw = self.BytesIO(b"abcdefghi")
   1684         bufio = self.tp(raw)
   1685 
   1686         bufio.write(b"123")
   1687         bufio.flush()
   1688         bufio.write(b"45")
   1689         bufio.flush()
   1690         bufio.seek(0, 0)
   1691         self.assertEqual(b"12345fghi", raw.getvalue())
   1692         self.assertEqual(b"12345fghi", bufio.read())
   1693 
   1694     def test_threads(self):
   1695         BufferedReaderTest.test_threads(self)
   1696         BufferedWriterTest.test_threads(self)
   1697 
   1698     def test_writes_and_peek(self):
   1699         def _peek(bufio):
   1700             bufio.peek(1)
   1701         self.check_writes(_peek)
   1702         def _peek(bufio):
   1703             pos = bufio.tell()
   1704             bufio.seek(-1, 1)
   1705             bufio.peek(1)
   1706             bufio.seek(pos, 0)
   1707         self.check_writes(_peek)
   1708 
   1709     def test_writes_and_reads(self):
   1710         def _read(bufio):
   1711             bufio.seek(-1, 1)
   1712             bufio.read(1)
   1713         self.check_writes(_read)
   1714 
   1715     def test_writes_and_read1s(self):
   1716         def _read1(bufio):
   1717             bufio.seek(-1, 1)
   1718             bufio.read1(1)
   1719         self.check_writes(_read1)
   1720 
   1721     def test_writes_and_readintos(self):
   1722         def _read(bufio):
   1723             bufio.seek(-1, 1)
   1724             bufio.readinto(bytearray(1))
   1725         self.check_writes(_read)
   1726 
   1727     def test_write_after_readahead(self):
   1728         # Issue #6629: writing after the buffer was filled by readahead should
   1729         # first rewind the raw stream.
   1730         for overwrite_size in [1, 5]:
   1731             raw = self.BytesIO(b"A" * 10)
   1732             bufio = self.tp(raw, 4)
   1733             # Trigger readahead
   1734             self.assertEqual(bufio.read(1), b"A")
   1735             self.assertEqual(bufio.tell(), 1)
   1736             # Overwriting should rewind the raw stream if it needs so
   1737             bufio.write(b"B" * overwrite_size)
   1738             self.assertEqual(bufio.tell(), overwrite_size + 1)
   1739             # If the write size was smaller than the buffer size, flush() and
   1740             # check that rewind happens.
   1741             bufio.flush()
   1742             self.assertEqual(bufio.tell(), overwrite_size + 1)
   1743             s = raw.getvalue()
   1744             self.assertEqual(s,
   1745                 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
   1746 
   1747     def test_write_rewind_write(self):
   1748         # Various combinations of reading / writing / seeking backwards / writing again
   1749         def mutate(bufio, pos1, pos2):
   1750             assert pos2 >= pos1
   1751             # Fill the buffer
   1752             bufio.seek(pos1)
   1753             bufio.read(pos2 - pos1)
   1754             bufio.write(b'\x02')
   1755             # This writes earlier than the previous write, but still inside
   1756             # the buffer.
   1757             bufio.seek(pos1)
   1758             bufio.write(b'\x01')
   1759 
   1760         b = b"\x80\x81\x82\x83\x84"
   1761         for i in range(0, len(b)):
   1762             for j in range(i, len(b)):
   1763                 raw = self.BytesIO(b)
   1764                 bufio = self.tp(raw, 100)
   1765                 mutate(bufio, i, j)
   1766                 bufio.flush()
   1767                 expected = bytearray(b)
   1768                 expected[j] = 2
   1769                 expected[i] = 1
   1770                 self.assertEqual(raw.getvalue(), expected,
   1771                                  "failed result for i=%d, j=%d" % (i, j))
   1772 
   1773     def test_truncate_after_read_or_write(self):
   1774         raw = self.BytesIO(b"A" * 10)
   1775         bufio = self.tp(raw, 100)
   1776         self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
   1777         self.assertEqual(bufio.truncate(), 2)
   1778         self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
   1779         self.assertEqual(bufio.truncate(), 4)
   1780 
   1781     def test_misbehaved_io(self):
   1782         BufferedReaderTest.test_misbehaved_io(self)
   1783         BufferedWriterTest.test_misbehaved_io(self)
   1784 
   1785     def test_interleaved_read_write(self):
   1786         # Test for issue #12213
   1787         with self.BytesIO(b'abcdefgh') as raw:
   1788             with self.tp(raw, 100) as f:
   1789                 f.write(b"1")
   1790                 self.assertEqual(f.read(1), b'b')
   1791                 f.write(b'2')
   1792                 self.assertEqual(f.read1(1), b'd')
   1793                 f.write(b'3')
   1794                 buf = bytearray(1)
   1795                 f.readinto(buf)
   1796                 self.assertEqual(buf, b'f')
   1797                 f.write(b'4')
   1798                 self.assertEqual(f.peek(1), b'h')
   1799                 f.flush()
   1800                 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
   1801 
   1802         with self.BytesIO(b'abc') as raw:
   1803             with self.tp(raw, 100) as f:
   1804                 self.assertEqual(f.read(1), b'a')
   1805                 f.write(b"2")
   1806                 self.assertEqual(f.read(1), b'c')
   1807                 f.flush()
   1808                 self.assertEqual(raw.getvalue(), b'a2c')
   1809 
   1810     def test_interleaved_readline_write(self):
   1811         with self.BytesIO(b'ab\ncdef\ng\n') as raw:
   1812             with self.tp(raw) as f:
   1813                 f.write(b'1')
   1814                 self.assertEqual(f.readline(), b'b\n')
   1815                 f.write(b'2')
   1816                 self.assertEqual(f.readline(), b'def\n')
   1817                 f.write(b'3')
   1818                 self.assertEqual(f.readline(), b'\n')
   1819                 f.flush()
   1820                 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
   1821 
   1822 
   1823 class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
   1824                           BufferedRandomTest, SizeofTest):
   1825     tp = io.BufferedRandom
   1826 
   1827     def test_constructor(self):
   1828         BufferedRandomTest.test_constructor(self)
   1829         # The allocation can succeed on 32-bit builds, e.g. with more
   1830         # than 2GB RAM and a 64-bit kernel.
   1831         if sys.maxsize > 0x7FFFFFFF:
   1832             rawio = self.MockRawIO()
   1833             bufio = self.tp(rawio)
   1834             self.assertRaises((OverflowError, MemoryError, ValueError),
   1835                 bufio.__init__, rawio, sys.maxsize)
   1836 
   1837     def test_garbage_collection(self):
   1838         CBufferedReaderTest.test_garbage_collection(self)
   1839         CBufferedWriterTest.test_garbage_collection(self)
   1840 
   1841     def test_args_error(self):
   1842         # Issue #17275
   1843         with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
   1844             self.tp(io.BytesIO(), 1024, 1024, 1024)
   1845 
   1846 
   1847 class PyBufferedRandomTest(BufferedRandomTest):
   1848     tp = pyio.BufferedRandom
   1849 
   1850 
   1851 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
   1852 # properties:
   1853 #   - A single output character can correspond to many bytes of input.
   1854 #   - The number of input bytes to complete the character can be
   1855 #     undetermined until the last input byte is received.
   1856 #   - The number of input bytes can vary depending on previous input.
   1857 #   - A single input byte can correspond to many characters of output.
   1858 #   - The number of output characters can be undetermined until the
   1859 #     last input byte is received.
   1860 #   - The number of output characters can vary depending on previous input.
   1861 
   1862 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
   1863     """
   1864     For testing seek/tell behavior with a stateful, buffering decoder.
   1865 
   1866     Input is a sequence of words.  Words may be fixed-length (length set
   1867     by input) or variable-length (period-terminated).  In variable-length
   1868     mode, extra periods are ignored.  Possible words are:
   1869       - 'i' followed by a number sets the input length, I (maximum 99).
   1870         When I is set to 0, words are space-terminated.
   1871       - 'o' followed by a number sets the output length, O (maximum 99).
   1872       - Any other word is converted into a word followed by a period on
   1873         the output.  The output word consists of the input word truncated
   1874         or padded out with hyphens to make its length equal to O.  If O
   1875         is 0, the word is output verbatim without truncating or padding.
   1876     I and O are initially set to 1.  When I changes, any buffered input is
   1877     re-scanned according to the new I.  EOF also terminates the last word.
   1878     """
   1879 
   1880     def __init__(self, errors='strict'):
   1881         codecs.IncrementalDecoder.__init__(self, errors)
   1882         self.reset()
   1883 
   1884     def __repr__(self):
   1885         return '<SID %x>' % id(self)
   1886 
   1887     def reset(self):
   1888         self.i = 1
   1889         self.o = 1
   1890         self.buffer = bytearray()
   1891 
   1892     def getstate(self):
   1893         i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
   1894         return bytes(self.buffer), i*100 + o
   1895 
   1896     def setstate(self, state):
   1897         buffer, io = state
   1898         self.buffer = bytearray(buffer)
   1899         i, o = divmod(io, 100)
   1900         self.i, self.o = i ^ 1, o ^ 1
   1901 
   1902     def decode(self, input, final=False):
   1903         output = ''
   1904         for b in input:
   1905             if self.i == 0: # variable-length, terminated with period
   1906                 if b == '.':
   1907                     if self.buffer:
   1908                         output += self.process_word()
   1909                 else:
   1910                     self.buffer.append(b)
   1911             else: # fixed-length, terminate after self.i bytes
   1912                 self.buffer.append(b)
   1913                 if len(self.buffer) == self.i:
   1914                     output += self.process_word()
   1915         if final and self.buffer: # EOF terminates the last word
   1916             output += self.process_word()
   1917         return output
   1918 
   1919     def process_word(self):
   1920         output = ''
   1921         if self.buffer[0] == ord('i'):
   1922             self.i = min(99, int(self.buffer[1:] or 0)) # set input length
   1923         elif self.buffer[0] == ord('o'):
   1924             self.o = min(99, int(self.buffer[1:] or 0)) # set output length
   1925         else:
   1926             output = self.buffer.decode('ascii')
   1927             if len(output) < self.o:
   1928                 output += '-'*self.o # pad out with hyphens
   1929             if self.o:
   1930                 output = output[:self.o] # truncate to output length
   1931             output += '.'
   1932         self.buffer = bytearray()
   1933         return output
   1934 
   1935     codecEnabled = False
   1936 
   1937     @classmethod
   1938     def lookupTestDecoder(cls, name):
   1939         if cls.codecEnabled and name == 'test_decoder':
   1940             latin1 = codecs.lookup('latin-1')
   1941             return codecs.CodecInfo(
   1942                 name='test_decoder', encode=latin1.encode, decode=None,
   1943                 incrementalencoder=None,
   1944                 streamreader=None, streamwriter=None,
   1945                 incrementaldecoder=cls)
   1946 
   1947 # Register the previous decoder for testing.
   1948 # Disabled by default, tests will enable it.
   1949 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
   1950 
   1951 
   1952 class StatefulIncrementalDecoderTest(unittest.TestCase):
   1953     """
   1954     Make sure the StatefulIncrementalDecoder actually works.
   1955     """
   1956 
   1957     test_cases = [
   1958         # I=1, O=1 (fixed-length input == fixed-length output)
   1959         (b'abcd', False, 'a.b.c.d.'),
   1960         # I=0, O=0 (variable-length input, variable-length output)
   1961         (b'oiabcd', True, 'abcd.'),
   1962         # I=0, O=0 (should ignore extra periods)
   1963         (b'oi...abcd...', True, 'abcd.'),
   1964         # I=0, O=6 (variable-length input, fixed-length output)
   1965         (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
   1966         # I=2, O=6 (fixed-length input < fixed-length output)
   1967         (b'i.i2.o6xyz', True, 'xy----.z-----.'),
   1968         # I=6, O=3 (fixed-length input > fixed-length output)
   1969         (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
   1970         # I=0, then 3; O=29, then 15 (with longer output)
   1971         (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
   1972          'a----------------------------.' +
   1973          'b----------------------------.' +
   1974          'cde--------------------------.' +
   1975          'abcdefghijabcde.' +
   1976          'a.b------------.' +
   1977          '.c.------------.' +
   1978          'd.e------------.' +
   1979          'k--------------.' +
   1980          'l--------------.' +
   1981          'm--------------.')
   1982     ]
   1983 
   1984     def test_decoder(self):
   1985         # Try a few one-shot test cases.
   1986         for input, eof, output in self.test_cases:
   1987             d = StatefulIncrementalDecoder()
   1988             self.assertEqual(d.decode(input, eof), output)
   1989 
   1990         # Also test an unfinished decode, followed by forcing EOF.
   1991         d = StatefulIncrementalDecoder()
   1992         self.assertEqual(d.decode(b'oiabcd'), '')
   1993         self.assertEqual(d.decode(b'', 1), 'abcd.')
   1994 
   1995 class TextIOWrapperTest(unittest.TestCase):
   1996 
   1997     def setUp(self):
   1998         self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
   1999         self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
   2000         support.unlink(support.TESTFN)
   2001 
   2002     def tearDown(self):
   2003         support.unlink(support.TESTFN)
   2004 
   2005     def test_constructor(self):
   2006         r = self.BytesIO(b"\xc3\xa9\n\n")
   2007         b = self.BufferedReader(r, 1000)
   2008         t = self.TextIOWrapper(b)
   2009         t.__init__(b, encoding="latin1", newline="\r\n")
   2010         self.assertEqual(t.encoding, "latin1")
   2011         self.assertEqual(t.line_buffering, False)
   2012         t.__init__(b, encoding="utf8", line_buffering=True)
   2013         self.assertEqual(t.encoding, "utf8")
   2014         self.assertEqual(t.line_buffering, True)
   2015         self.assertEqual("\xe9\n", t.readline())
   2016         self.assertRaises(TypeError, t.__init__, b, newline=42)
   2017         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
   2018 
   2019     def test_uninitialized(self):
   2020         t = self.TextIOWrapper.__new__(self.TextIOWrapper)
   2021         del t
   2022         t = self.TextIOWrapper.__new__(self.TextIOWrapper)
   2023         self.assertRaises(Exception, repr, t)
   2024         self.assertRaisesRegexp((ValueError, AttributeError),
   2025                                 'uninitialized|has no attribute',
   2026                                 t.read, 0)
   2027         t.__init__(self.MockRawIO())
   2028         self.assertEqual(t.read(0), u'')
   2029 
   2030     def test_non_text_encoding_codecs_are_rejected(self):
   2031         # Ensure the constructor complains if passed a codec that isn't
   2032         # marked as a text encoding
   2033         # http://bugs.python.org/issue20404
   2034         r = self.BytesIO()
   2035         b = self.BufferedWriter(r)
   2036         with support.check_py3k_warnings():
   2037             self.TextIOWrapper(b, encoding="hex_codec")
   2038 
   2039     def test_detach(self):
   2040         r = self.BytesIO()
   2041         b = self.BufferedWriter(r)
   2042         t = self.TextIOWrapper(b)
   2043         self.assertIs(t.detach(), b)
   2044 
   2045         t = self.TextIOWrapper(b, encoding="ascii")
   2046         t.write("howdy")
   2047         self.assertFalse(r.getvalue())
   2048         t.detach()
   2049         self.assertEqual(r.getvalue(), b"howdy")
   2050         self.assertRaises(ValueError, t.detach)
   2051 
   2052         # Operations independent of the detached stream should still work
   2053         repr(t)
   2054         self.assertEqual(t.encoding, "ascii")
   2055         self.assertEqual(t.errors, "strict")
   2056         self.assertFalse(t.line_buffering)
   2057 
   2058     def test_repr(self):
   2059         raw = self.BytesIO("hello".encode("utf-8"))
   2060         b = self.BufferedReader(raw)
   2061         t = self.TextIOWrapper(b, encoding="utf-8")
   2062         modname = self.TextIOWrapper.__module__
   2063         self.assertEqual(repr(t),
   2064                          "<%s.TextIOWrapper encoding='utf-8'>" % modname)
   2065         raw.name = "dummy"
   2066         self.assertEqual(repr(t),
   2067                          "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
   2068         raw.name = b"dummy"
   2069         self.assertEqual(repr(t),
   2070                          "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
   2071 
   2072         t.buffer.detach()
   2073         repr(t)  # Should not raise an exception
   2074 
   2075     def test_line_buffering(self):
   2076         r = self.BytesIO()
   2077         b = self.BufferedWriter(r, 1000)
   2078         t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
   2079         t.write("X")
   2080         self.assertEqual(r.getvalue(), b"")  # No flush happened
   2081         t.write("Y\nZ")
   2082         self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
   2083         t.write("A\rB")
   2084         self.assertEqual(r.getvalue(), b"XY\nZA\rB")
   2085 
   2086     def test_encoding(self):
   2087         # Check the encoding attribute is always set, and valid
   2088         b = self.BytesIO()
   2089         t = self.TextIOWrapper(b, encoding="utf8")
   2090         self.assertEqual(t.encoding, "utf8")
   2091         t = self.TextIOWrapper(b)
   2092         self.assertIsNotNone(t.encoding)
   2093         codecs.lookup(t.encoding)
   2094 
   2095     def test_encoding_errors_reading(self):
   2096         # (1) default
   2097         b = self.BytesIO(b"abc\n\xff\n")
   2098         t = self.TextIOWrapper(b, encoding="ascii")
   2099         self.assertRaises(UnicodeError, t.read)
   2100         # (2) explicit strict
   2101         b = self.BytesIO(b"abc\n\xff\n")
   2102         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
   2103         self.assertRaises(UnicodeError, t.read)
   2104         # (3) ignore
   2105         b = self.BytesIO(b"abc\n\xff\n")
   2106         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
   2107         self.assertEqual(t.read(), "abc\n\n")
   2108         # (4) replace
   2109         b = self.BytesIO(b"abc\n\xff\n")
   2110         t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
   2111         self.assertEqual(t.read(), "abc\n\ufffd\n")
   2112 
   2113     def test_encoding_errors_writing(self):
   2114         # (1) default
   2115         b = self.BytesIO()
   2116         t = self.TextIOWrapper(b, encoding="ascii")
   2117         self.assertRaises(UnicodeError, t.write, "\xff")
   2118         # (2) explicit strict
   2119         b = self.BytesIO()
   2120         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
   2121         self.assertRaises(UnicodeError, t.write, "\xff")
   2122         # (3) ignore
   2123         b = self.BytesIO()
   2124         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
   2125                              newline="\n")
   2126         t.write("abc\xffdef\n")
   2127         t.flush()
   2128         self.assertEqual(b.getvalue(), b"abcdef\n")
   2129         # (4) replace
   2130         b = self.BytesIO()
   2131         t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
   2132                              newline="\n")
   2133         t.write("abc\xffdef\n")
   2134         t.flush()
   2135         self.assertEqual(b.getvalue(), b"abc?def\n")
   2136 
   2137     def test_newlines(self):
   2138         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
   2139 
   2140         tests = [
   2141             [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
   2142             [ '', input_lines ],
   2143             [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
   2144             [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
   2145             [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
   2146         ]
   2147         encodings = (
   2148             'utf-8', 'latin-1',
   2149             'utf-16', 'utf-16-le', 'utf-16-be',
   2150             'utf-32', 'utf-32-le', 'utf-32-be',
   2151         )
   2152 
   2153         # Try a range of buffer sizes to test the case where \r is the last
   2154         # character in TextIOWrapper._pending_line.
   2155         for encoding in encodings:
   2156             # XXX: str.encode() should return bytes
   2157             data = bytes(''.join(input_lines).encode(encoding))
   2158             for do_reads in (False, True):
   2159                 for bufsize in range(1, 10):
   2160                     for newline, exp_lines in tests:
   2161                         bufio = self.BufferedReader(self.BytesIO(data), bufsize)
   2162                         textio = self.TextIOWrapper(bufio, newline=newline,
   2163                                                   encoding=encoding)
   2164                         if do_reads:
   2165                             got_lines = []
   2166                             while True:
   2167                                 c2 = textio.read(2)
   2168                                 if c2 == '':
   2169                                     break
   2170                                 self.assertEqual(len(c2), 2)
   2171                                 got_lines.append(c2 + textio.readline())
   2172                         else:
   2173                             got_lines = list(textio)
   2174 
   2175                         for got_line, exp_line in zip(got_lines, exp_lines):
   2176                             self.assertEqual(got_line, exp_line)
   2177                         self.assertEqual(len(got_lines), len(exp_lines))
   2178 
   2179     def test_newlines_input(self):
   2180         testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
   2181         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
   2182         for newline, expected in [
   2183             (None, normalized.decode("ascii").splitlines(True)),
   2184             ("", testdata.decode("ascii").splitlines(True)),
   2185             ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
   2186             ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
   2187             ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
   2188             ]:
   2189             buf = self.BytesIO(testdata)
   2190             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
   2191             self.assertEqual(txt.readlines(), expected)
   2192             txt.seek(0)
   2193             self.assertEqual(txt.read(), "".join(expected))
   2194 
   2195     def test_newlines_output(self):
   2196         testdict = {
   2197             "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
   2198             "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
   2199             "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
   2200             "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
   2201             }
   2202         tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
   2203         for newline, expected in tests:
   2204             buf = self.BytesIO()
   2205             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
   2206             txt.write("AAA\nB")
   2207             txt.write("BB\nCCC\n")
   2208             txt.write("X\rY\r\nZ")
   2209             txt.flush()
   2210             self.assertEqual(buf.closed, False)
   2211             self.assertEqual(buf.getvalue(), expected)
   2212 
   2213     def test_destructor(self):
   2214         l = []
   2215         base = self.BytesIO
   2216         class MyBytesIO(base):
   2217             def close(self):
   2218                 l.append(self.getvalue())
   2219                 base.close(self)
   2220         b = MyBytesIO()
   2221         t = self.TextIOWrapper(b, encoding="ascii")
   2222         t.write("abc")
   2223         del t
   2224         support.gc_collect()
   2225         self.assertEqual([b"abc"], l)
   2226 
   2227     def test_override_destructor(self):
   2228         record = []
   2229         class MyTextIO(self.TextIOWrapper):
   2230             def __del__(self):
   2231                 record.append(1)
   2232                 try:
   2233                     f = super(MyTextIO, self).__del__
   2234                 except AttributeError:
   2235                     pass
   2236                 else:
   2237                     f()
   2238             def close(self):
   2239                 record.append(2)
   2240                 super(MyTextIO, self).close()
   2241             def flush(self):
   2242                 record.append(3)
   2243                 super(MyTextIO, self).flush()
   2244         b = self.BytesIO()
   2245         t = MyTextIO(b, encoding="ascii")
   2246         del t
   2247         support.gc_collect()
   2248         self.assertEqual(record, [1, 2, 3])
   2249 
   2250     def test_error_through_destructor(self):
   2251         # Test that the exception state is not modified by a destructor,
   2252         # even if close() fails.
   2253         rawio = self.CloseFailureIO()
   2254         def f():
   2255             self.TextIOWrapper(rawio).xyzzy
   2256         with support.captured_output("stderr") as s:
   2257             self.assertRaises(AttributeError, f)
   2258         s = s.getvalue().strip()
   2259         if s:
   2260             # The destructor *may* have printed an unraisable error, check it
   2261             self.assertEqual(len(s.splitlines()), 1)
   2262             self.assertTrue(s.startswith("Exception IOError: "), s)
   2263             self.assertTrue(s.endswith(" ignored"), s)
   2264 
   2265     # Systematic tests of the text I/O API
   2266 
   2267     def test_basic_io(self):
   2268         for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
   2269             for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
   2270                 f = self.open(support.TESTFN, "w+", encoding=enc)
   2271                 f._CHUNK_SIZE = chunksize
   2272                 self.assertEqual(f.write("abc"), 3)
   2273                 f.close()
   2274                 f = self.open(support.TESTFN, "r+", encoding=enc)
   2275                 f._CHUNK_SIZE = chunksize
   2276                 self.assertEqual(f.tell(), 0)
   2277                 self.assertEqual(f.read(), "abc")
   2278                 cookie = f.tell()
   2279                 self.assertEqual(f.seek(0), 0)
   2280                 self.assertEqual(f.read(None), "abc")
   2281                 f.seek(0)
   2282                 self.assertEqual(f.read(2), "ab")
   2283                 self.assertEqual(f.read(1), "c")
   2284                 self.assertEqual(f.read(1), "")
   2285                 self.assertEqual(f.read(), "")
   2286                 self.assertEqual(f.tell(), cookie)
   2287                 self.assertEqual(f.seek(0), 0)
   2288                 self.assertEqual(f.seek(0, 2), cookie)
   2289                 self.assertEqual(f.write("def"), 3)
   2290                 self.assertEqual(f.seek(cookie), cookie)
   2291                 self.assertEqual(f.read(), "def")
   2292                 if enc.startswith("utf"):
   2293                     self.multi_line_test(f, enc)
   2294                 f.close()
   2295 
   2296     def multi_line_test(self, f, enc):
   2297         f.seek(0)
   2298         f.truncate()
   2299         sample = "s\xff\u0fff\uffff"
   2300         wlines = []
   2301         for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
   2302             chars = []
   2303             for i in range(size):
   2304                 chars.append(sample[i % len(sample)])
   2305             line = "".join(chars) + "\n"
   2306             wlines.append((f.tell(), line))
   2307             f.write(line)
   2308         f.seek(0)
   2309         rlines = []
   2310         while True:
   2311             pos = f.tell()
   2312             line = f.readline()
   2313             if not line:
   2314                 break
   2315             rlines.append((pos, line))
   2316         self.assertEqual(rlines, wlines)
   2317 
   2318     def test_telling(self):
   2319         f = self.open(support.TESTFN, "w+", encoding="utf8")
   2320         p0 = f.tell()
   2321         f.write("\xff\n")
   2322         p1 = f.tell()
   2323         f.write("\xff\n")
   2324         p2 = f.tell()
   2325         f.seek(0)
   2326         self.assertEqual(f.tell(), p0)
   2327         self.assertEqual(f.readline(), "\xff\n")
   2328         self.assertEqual(f.tell(), p1)
   2329         self.assertEqual(f.readline(), "\xff\n")
   2330         self.assertEqual(f.tell(), p2)
   2331         f.seek(0)
   2332         for line in f:
   2333             self.assertEqual(line, "\xff\n")
   2334             self.assertRaises(IOError, f.tell)
   2335         self.assertEqual(f.tell(), p2)
   2336         f.close()
   2337 
   2338     def test_seeking(self):
   2339         chunk_size = _default_chunk_size()
   2340         prefix_size = chunk_size - 2
   2341         u_prefix = "a" * prefix_size
   2342         prefix = bytes(u_prefix.encode("utf-8"))
   2343         self.assertEqual(len(u_prefix), len(prefix))
   2344         u_suffix = "\u8888\n"
   2345         suffix = bytes(u_suffix.encode("utf-8"))
   2346         line = prefix + suffix
   2347         f = self.open(support.TESTFN, "wb")
   2348         f.write(line*2)
   2349         f.close()
   2350         f = self.open(support.TESTFN, "r", encoding="utf-8")
   2351         s = f.read(prefix_size)
   2352         self.assertEqual(s, prefix.decode("ascii"))
   2353         self.assertEqual(f.tell(), prefix_size)
   2354         self.assertEqual(f.readline(), u_suffix)
   2355 
   2356     def test_seeking_too(self):
   2357         # Regression test for a specific bug
   2358         data = b'\xe0\xbf\xbf\n'
   2359         f = self.open(support.TESTFN, "wb")
   2360         f.write(data)
   2361         f.close()
   2362         f = self.open(support.TESTFN, "r", encoding="utf-8")
   2363         f._CHUNK_SIZE  # Just test that it exists
   2364         f._CHUNK_SIZE = 2
   2365         f.readline()
   2366         f.tell()
   2367 
   2368     def test_seek_and_tell(self):
   2369         #Test seek/tell using the StatefulIncrementalDecoder.
   2370         # Make test faster by doing smaller seeks
   2371         CHUNK_SIZE = 128
   2372 
   2373         def test_seek_and_tell_with_data(data, min_pos=0):
   2374             """Tell/seek to various points within a data stream and ensure
   2375             that the decoded data returned by read() is consistent."""
   2376             f = self.open(support.TESTFN, 'wb')
   2377             f.write(data)
   2378             f.close()
   2379             f = self.open(support.TESTFN, encoding='test_decoder')
   2380             f._CHUNK_SIZE = CHUNK_SIZE
   2381             decoded = f.read()
   2382             f.close()
   2383 
   2384             for i in range(min_pos, len(decoded) + 1): # seek positions
   2385                 for j in [1, 5, len(decoded) - i]: # read lengths
   2386                     f = self.open(support.TESTFN, encoding='test_decoder')
   2387                     self.assertEqual(f.read(i), decoded[:i])
   2388                     cookie = f.tell()
   2389                     self.assertEqual(f.read(j), decoded[i:i + j])
   2390                     f.seek(cookie)
   2391                     self.assertEqual(f.read(), decoded[i:])
   2392                     f.close()
   2393 
   2394         # Enable the test decoder.
   2395         StatefulIncrementalDecoder.codecEnabled = 1
   2396 
   2397         # Run the tests.
   2398         try:
   2399             # Try each test case.
   2400             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
   2401                 test_seek_and_tell_with_data(input)
   2402 
   2403             # Position each test case so that it crosses a chunk boundary.
   2404             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
   2405                 offset = CHUNK_SIZE - len(input)//2
   2406                 prefix = b'.'*offset
   2407                 # Don't bother seeking into the prefix (takes too long).
   2408                 min_pos = offset*2
   2409                 test_seek_and_tell_with_data(prefix + input, min_pos)
   2410 
   2411         # Ensure our test decoder won't interfere with subsequent tests.
   2412         finally:
   2413             StatefulIncrementalDecoder.codecEnabled = 0
   2414 
   2415     def test_encoded_writes(self):
   2416         data = "1234567890"
   2417         tests = ("utf-16",
   2418                  "utf-16-le",
   2419                  "utf-16-be",
   2420                  "utf-32",
   2421                  "utf-32-le",
   2422                  "utf-32-be")
   2423         for encoding in tests:
   2424             buf = self.BytesIO()
   2425             f = self.TextIOWrapper(buf, encoding=encoding)
   2426             # Check if the BOM is written only once (see issue1753).
   2427             f.write(data)
   2428             f.write(data)
   2429             f.seek(0)
   2430             self.assertEqual(f.read(), data * 2)
   2431             f.seek(0)
   2432             self.assertEqual(f.read(), data * 2)
   2433             self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
   2434 
   2435     def test_unreadable(self):
   2436         class UnReadable(self.BytesIO):
   2437             def readable(self):
   2438                 return False
   2439         txt = self.TextIOWrapper(UnReadable())
   2440         self.assertRaises(IOError, txt.read)
   2441 
   2442     def test_read_one_by_one(self):
   2443         txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
   2444         reads = ""
   2445         while True:
   2446             c = txt.read(1)
   2447             if not c:
   2448                 break
   2449             reads += c
   2450         self.assertEqual(reads, "AA\nBB")
   2451 
   2452     def test_readlines(self):
   2453         txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
   2454         self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
   2455         txt.seek(0)
   2456         self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
   2457         txt.seek(0)
   2458         self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
   2459 
   2460     # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
   2461     def test_read_by_chunk(self):
   2462         # make sure "\r\n" straddles 128 char boundary.
   2463         txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
   2464         reads = ""
   2465         while True:
   2466             c = txt.read(128)
   2467             if not c:
   2468                 break
   2469             reads += c
   2470         self.assertEqual(reads, "A"*127+"\nB")
   2471 
   2472     def test_writelines(self):
   2473         l = ['ab', 'cd', 'ef']
   2474         buf = self.BytesIO()
   2475         txt = self.TextIOWrapper(buf)
   2476         txt.writelines(l)
   2477         txt.flush()
   2478         self.assertEqual(buf.getvalue(), b'abcdef')
   2479 
   2480     def test_writelines_userlist(self):
   2481         l = UserList(['ab', 'cd', 'ef'])
   2482         buf = self.BytesIO()
   2483         txt = self.TextIOWrapper(buf)
   2484         txt.writelines(l)
   2485         txt.flush()
   2486         self.assertEqual(buf.getvalue(), b'abcdef')
   2487 
   2488     def test_writelines_error(self):
   2489         txt = self.TextIOWrapper(self.BytesIO())
   2490         self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
   2491         self.assertRaises(TypeError, txt.writelines, None)
   2492         self.assertRaises(TypeError, txt.writelines, b'abc')
   2493 
   2494     def test_issue1395_1(self):
   2495         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2496 
   2497         # read one char at a time
   2498         reads = ""
   2499         while True:
   2500             c = txt.read(1)
   2501             if not c:
   2502                 break
   2503             reads += c
   2504         self.assertEqual(reads, self.normalized)
   2505 
   2506     def test_issue1395_2(self):
   2507         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2508         txt._CHUNK_SIZE = 4
   2509 
   2510         reads = ""
   2511         while True:
   2512             c = txt.read(4)
   2513             if not c:
   2514                 break
   2515             reads += c
   2516         self.assertEqual(reads, self.normalized)
   2517 
   2518     def test_issue1395_3(self):
   2519         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2520         txt._CHUNK_SIZE = 4
   2521 
   2522         reads = txt.read(4)
   2523         reads += txt.read(4)
   2524         reads += txt.readline()
   2525         reads += txt.readline()
   2526         reads += txt.readline()
   2527         self.assertEqual(reads, self.normalized)
   2528 
   2529     def test_issue1395_4(self):
   2530         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2531         txt._CHUNK_SIZE = 4
   2532 
   2533         reads = txt.read(4)
   2534         reads += txt.read()
   2535         self.assertEqual(reads, self.normalized)
   2536 
   2537     def test_issue1395_5(self):
   2538         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2539         txt._CHUNK_SIZE = 4
   2540 
   2541         reads = txt.read(4)
   2542         pos = txt.tell()
   2543         txt.seek(0)
   2544         txt.seek(pos)
   2545         self.assertEqual(txt.read(4), "BBB\n")
   2546 
   2547     def test_issue2282(self):
   2548         buffer = self.BytesIO(self.testdata)
   2549         txt = self.TextIOWrapper(buffer, encoding="ascii")
   2550 
   2551         self.assertEqual(buffer.seekable(), txt.seekable())
   2552 
   2553     def test_append_bom(self):
   2554         # The BOM is not written again when appending to a non-empty file
   2555         filename = support.TESTFN
   2556         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
   2557             with self.open(filename, 'w', encoding=charset) as f:
   2558                 f.write('aaa')
   2559                 pos = f.tell()
   2560             with self.open(filename, 'rb') as f:
   2561                 self.assertEqual(f.read(), 'aaa'.encode(charset))
   2562 
   2563             with self.open(filename, 'a', encoding=charset) as f:
   2564                 f.write('xxx')
   2565             with self.open(filename, 'rb') as f:
   2566                 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
   2567 
   2568     def test_seek_bom(self):
   2569         # Same test, but when seeking manually
   2570         filename = support.TESTFN
   2571         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
   2572             with self.open(filename, 'w', encoding=charset) as f:
   2573                 f.write('aaa')
   2574                 pos = f.tell()
   2575             with self.open(filename, 'r+', encoding=charset) as f:
   2576                 f.seek(pos)
   2577                 f.write('zzz')
   2578                 f.seek(0)
   2579                 f.write('bbb')
   2580             with self.open(filename, 'rb') as f:
   2581                 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
   2582 
   2583     def test_errors_property(self):
   2584         with self.open(support.TESTFN, "w") as f:
   2585             self.assertEqual(f.errors, "strict")
   2586         with self.open(support.TESTFN, "w", errors="replace") as f:
   2587             self.assertEqual(f.errors, "replace")
   2588 
   2589     @unittest.skipUnless(threading, 'Threading required for this test.')
   2590     def test_threads_write(self):
   2591         # Issue6750: concurrent writes could duplicate data
   2592         event = threading.Event()
   2593         with self.open(support.TESTFN, "w", buffering=1) as f:
   2594             def run(n):
   2595                 text = "Thread%03d\n" % n
   2596                 event.wait()
   2597                 f.write(text)
   2598             threads = [threading.Thread(target=run, args=(x,))
   2599                        for x in range(20)]
   2600             with support.start_threads(threads, event.set):
   2601                 time.sleep(0.02)
   2602         with self.open(support.TESTFN) as f:
   2603             content = f.read()
   2604             for n in range(20):
   2605                 self.assertEqual(content.count("Thread%03d\n" % n), 1)
   2606 
   2607     def test_flush_error_on_close(self):
   2608         # Test that text file is closed despite failed flush
   2609         # and that flush() is called before file closed.
   2610         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2611         closed = []
   2612         def bad_flush():
   2613             closed[:] = [txt.closed, txt.buffer.closed]
   2614             raise IOError()
   2615         txt.flush = bad_flush
   2616         self.assertRaises(IOError, txt.close) # exception not swallowed
   2617         self.assertTrue(txt.closed)
   2618         self.assertTrue(txt.buffer.closed)
   2619         self.assertTrue(closed)      # flush() called
   2620         self.assertFalse(closed[0])  # flush() called before file closed
   2621         self.assertFalse(closed[1])
   2622         txt.flush = lambda: None  # break reference loop
   2623 
   2624     def test_multi_close(self):
   2625         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2626         txt.close()
   2627         txt.close()
   2628         txt.close()
   2629         self.assertRaises(ValueError, txt.flush)
   2630 
   2631     def test_readonly_attributes(self):
   2632         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2633         buf = self.BytesIO(self.testdata)
   2634         with self.assertRaises((AttributeError, TypeError)):
   2635             txt.buffer = buf
   2636 
   2637     def test_read_nonbytes(self):
   2638         # Issue #17106
   2639         # Crash when underlying read() returns non-bytes
   2640         class NonbytesStream(self.StringIO):
   2641             read1 = self.StringIO.read
   2642         class NonbytesStream(self.StringIO):
   2643             read1 = self.StringIO.read
   2644         t = self.TextIOWrapper(NonbytesStream('a'))
   2645         with self.maybeRaises(TypeError):
   2646             t.read(1)
   2647         t = self.TextIOWrapper(NonbytesStream('a'))
   2648         with self.maybeRaises(TypeError):
   2649             t.readline()
   2650         t = self.TextIOWrapper(NonbytesStream('a'))
   2651         self.assertEqual(t.read(), u'a')
   2652 
   2653     def test_illegal_decoder(self):
   2654         # Issue #17106
   2655         # Bypass the early encoding check added in issue 20404
   2656         def _make_illegal_wrapper():
   2657             quopri = codecs.lookup("quopri_codec")
   2658             quopri._is_text_encoding = True
   2659             try:
   2660                 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
   2661                                        newline='\n', encoding="quopri_codec")
   2662             finally:
   2663                 quopri._is_text_encoding = False
   2664             return t
   2665         # Crash when decoder returns non-string
   2666         with support.check_py3k_warnings():
   2667             t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
   2668                                    encoding='quopri_codec')
   2669         with self.maybeRaises(TypeError):
   2670             t.read(1)
   2671         with support.check_py3k_warnings():
   2672             t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
   2673                                    encoding='quopri_codec')
   2674         with self.maybeRaises(TypeError):
   2675             t.readline()
   2676         with support.check_py3k_warnings():
   2677             t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
   2678                                    encoding='quopri_codec')
   2679         with self.maybeRaises(TypeError):
   2680             t.read()
   2681         #else:
   2682             #t = _make_illegal_wrapper()
   2683             #self.assertRaises(TypeError, t.read, 1)
   2684             #t = _make_illegal_wrapper()
   2685             #self.assertRaises(TypeError, t.readline)
   2686             #t = _make_illegal_wrapper()
   2687             #self.assertRaises(TypeError, t.read)
   2688 
   2689 
   2690 class CTextIOWrapperTest(TextIOWrapperTest):
   2691 
   2692     def test_initialization(self):
   2693         r = self.BytesIO(b"\xc3\xa9\n\n")
   2694         b = self.BufferedReader(r, 1000)
   2695         t = self.TextIOWrapper(b)
   2696         self.assertRaises(TypeError, t.__init__, b, newline=42)
   2697         self.assertRaises(ValueError, t.read)
   2698         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
   2699         self.assertRaises(ValueError, t.read)
   2700 
   2701         t = self.TextIOWrapper.__new__(self.TextIOWrapper)
   2702         self.assertRaises(Exception, repr, t)
   2703 
   2704     def test_garbage_collection(self):
   2705         # C TextIOWrapper objects are collected, and collecting them flushes
   2706         # all data to disk.
   2707         # The Python version has __del__, so it ends in gc.garbage instead.
   2708         rawio = io.FileIO(support.TESTFN, "wb")
   2709         b = self.BufferedWriter(rawio)
   2710         t = self.TextIOWrapper(b, encoding="ascii")
   2711         t.write("456def")
   2712         t.x = t
   2713         wr = weakref.ref(t)
   2714         del t
   2715         support.gc_collect()
   2716         self.assertIsNone(wr(), wr)
   2717         with self.open(support.TESTFN, "rb") as f:
   2718             self.assertEqual(f.read(), b"456def")
   2719 
   2720     def test_rwpair_cleared_before_textio(self):
   2721         # Issue 13070: TextIOWrapper's finalization would crash when called
   2722         # after the reference to the underlying BufferedRWPair's writer got
   2723         # cleared by the GC.
   2724         for i in range(1000):
   2725             b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
   2726             t1 = self.TextIOWrapper(b1, encoding="ascii")
   2727             b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
   2728             t2 = self.TextIOWrapper(b2, encoding="ascii")
   2729             # circular references
   2730             t1.buddy = t2
   2731             t2.buddy = t1
   2732         support.gc_collect()
   2733 
   2734     maybeRaises = unittest.TestCase.assertRaises
   2735 
   2736 
   2737 class PyTextIOWrapperTest(TextIOWrapperTest):
   2738     @contextlib.contextmanager
   2739     def maybeRaises(self, *args, **kwds):
   2740         yield
   2741 
   2742 
   2743 class IncrementalNewlineDecoderTest(unittest.TestCase):
   2744 
   2745     def check_newline_decoding_utf8(self, decoder):
   2746         # UTF-8 specific tests for a newline decoder
   2747         def _check_decode(b, s, **kwargs):
   2748             # We exercise getstate() / setstate() as well as decode()
   2749             state = decoder.getstate()
   2750             self.assertEqual(decoder.decode(b, **kwargs), s)
   2751             decoder.setstate(state)
   2752             self.assertEqual(decoder.decode(b, **kwargs), s)
   2753 
   2754         _check_decode(b'\xe8\xa2\x88', "\u8888")
   2755 
   2756         _check_decode(b'\xe8', "")
   2757         _check_decode(b'\xa2', "")
   2758         _check_decode(b'\x88', "\u8888")
   2759 
   2760         _check_decode(b'\xe8', "")
   2761         _check_decode(b'\xa2', "")
   2762         _check_decode(b'\x88', "\u8888")
   2763 
   2764         _check_decode(b'\xe8', "")
   2765         self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
   2766 
   2767         decoder.reset()
   2768         _check_decode(b'\n', "\n")
   2769         _check_decode(b'\r', "")
   2770         _check_decode(b'', "\n", final=True)
   2771         _check_decode(b'\r', "\n", final=True)
   2772 
   2773         _check_decode(b'\r', "")
   2774         _check_decode(b'a', "\na")
   2775 
   2776         _check_decode(b'\r\r\n', "\n\n")
   2777         _check_decode(b'\r', "")
   2778         _check_decode(b'\r', "\n")
   2779         _check_decode(b'\na', "\na")
   2780 
   2781         _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
   2782         _check_decode(b'\xe8\xa2\x88', "\u8888")
   2783         _check_decode(b'\n', "\n")
   2784         _check_decode(b'\xe8\xa2\x88\r', "\u8888")
   2785         _check_decode(b'\n', "\n")
   2786 
   2787     def check_newline_decoding(self, decoder, encoding):
   2788         result = []
   2789         if encoding is not None:
   2790             encoder = codecs.getincrementalencoder(encoding)()
   2791             def _decode_bytewise(s):
   2792                 # Decode one byte at a time
   2793                 for b in encoder.encode(s):
   2794                     result.append(decoder.decode(b))
   2795         else:
   2796             encoder = None
   2797             def _decode_bytewise(s):
   2798                 # Decode one char at a time
   2799                 for c in s:
   2800                     result.append(decoder.decode(c))
   2801         self.assertEqual(decoder.newlines, None)
   2802         _decode_bytewise("abc\n\r")
   2803         self.assertEqual(decoder.newlines, '\n')
   2804         _decode_bytewise("\nabc")
   2805         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
   2806         _decode_bytewise("abc\r")
   2807         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
   2808         _decode_bytewise("abc")
   2809         self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
   2810         _decode_bytewise("abc\r")
   2811         self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
   2812         decoder.reset()
   2813         input = "abc"
   2814         if encoder is not None:
   2815             encoder.reset()
   2816             input = encoder.encode(input)
   2817         self.assertEqual(decoder.decode(input), "abc")
   2818         self.assertEqual(decoder.newlines, None)
   2819 
   2820     def test_newline_decoder(self):
   2821         encodings = (
   2822             # None meaning the IncrementalNewlineDecoder takes unicode input
   2823             # rather than bytes input
   2824             None, 'utf-8', 'latin-1',
   2825             'utf-16', 'utf-16-le', 'utf-16-be',
   2826             'utf-32', 'utf-32-le', 'utf-32-be',
   2827         )
   2828         for enc in encodings:
   2829             decoder = enc and codecs.getincrementaldecoder(enc)()
   2830             decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
   2831             self.check_newline_decoding(decoder, enc)
   2832         decoder = codecs.getincrementaldecoder("utf-8")()
   2833         decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
   2834         self.check_newline_decoding_utf8(decoder)
   2835 
   2836     def test_newline_bytes(self):
   2837         # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
   2838         def _check(dec):
   2839             self.assertEqual(dec.newlines, None)
   2840             self.assertEqual(dec.decode("\u0D00"), "\u0D00")
   2841             self.assertEqual(dec.newlines, None)
   2842             self.assertEqual(dec.decode("\u0A00"), "\u0A00")
   2843             self.assertEqual(dec.newlines, None)
   2844         dec = self.IncrementalNewlineDecoder(None, translate=False)
   2845         _check(dec)
   2846         dec = self.IncrementalNewlineDecoder(None, translate=True)
   2847         _check(dec)
   2848 
   2849 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
   2850     pass
   2851 
   2852 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
   2853     pass
   2854 
   2855 
   2856 # XXX Tests for open()
   2857 
   2858 class MiscIOTest(unittest.TestCase):
   2859 
   2860     def tearDown(self):
   2861         support.unlink(support.TESTFN)
   2862 
   2863     def test___all__(self):
   2864         for name in self.io.__all__:
   2865             obj = getattr(self.io, name, None)
   2866             self.assertIsNotNone(obj, name)
   2867             if name == "open":
   2868                 continue
   2869             elif "error" in name.lower() or name == "UnsupportedOperation":
   2870                 self.assertTrue(issubclass(obj, Exception), name)
   2871             elif not name.startswith("SEEK_"):
   2872                 self.assertTrue(issubclass(obj, self.IOBase))
   2873 
   2874     def test_attributes(self):
   2875         f = self.open(support.TESTFN, "wb", buffering=0)
   2876         self.assertEqual(f.mode, "wb")
   2877         f.close()
   2878 
   2879         f = self.open(support.TESTFN, "U")
   2880         self.assertEqual(f.name,            support.TESTFN)
   2881         self.assertEqual(f.buffer.name,     support.TESTFN)
   2882         self.assertEqual(f.buffer.raw.name, support.TESTFN)
   2883         self.assertEqual(f.mode,            "U")
   2884         self.assertEqual(f.buffer.mode,     "rb")
   2885         self.assertEqual(f.buffer.raw.mode, "rb")
   2886         f.close()
   2887 
   2888         f = self.open(support.TESTFN, "w+")
   2889         self.assertEqual(f.mode,            "w+")
   2890         self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
   2891         self.assertEqual(f.buffer.raw.mode, "rb+")
   2892 
   2893         g = self.open(f.fileno(), "wb", closefd=False)
   2894         self.assertEqual(g.mode,     "wb")
   2895         self.assertEqual(g.raw.mode, "wb")
   2896         self.assertEqual(g.name,     f.fileno())
   2897         self.assertEqual(g.raw.name, f.fileno())
   2898         f.close()
   2899         g.close()
   2900 
   2901     def test_io_after_close(self):
   2902         for kwargs in [
   2903                 {"mode": "w"},
   2904                 {"mode": "wb"},
   2905                 {"mode": "w", "buffering": 1},
   2906                 {"mode": "w", "buffering": 2},
   2907                 {"mode": "wb", "buffering": 0},
   2908                 {"mode": "r"},
   2909                 {"mode": "rb"},
   2910                 {"mode": "r", "buffering": 1},
   2911                 {"mode": "r", "buffering": 2},
   2912                 {"mode": "rb", "buffering": 0},
   2913                 {"mode": "w+"},
   2914                 {"mode": "w+b"},
   2915                 {"mode": "w+", "buffering": 1},
   2916                 {"mode": "w+", "buffering": 2},
   2917                 {"mode": "w+b", "buffering": 0},
   2918             ]:
   2919             f = self.open(support.TESTFN, **kwargs)
   2920             f.close()
   2921             self.assertRaises(ValueError, f.flush)
   2922             self.assertRaises(ValueError, f.fileno)
   2923             self.assertRaises(ValueError, f.isatty)
   2924             self.assertRaises(ValueError, f.__iter__)
   2925             if hasattr(f, "peek"):
   2926                 self.assertRaises(ValueError, f.peek, 1)
   2927             self.assertRaises(ValueError, f.read)
   2928             if hasattr(f, "read1"):
   2929                 self.assertRaises(ValueError, f.read1, 1024)
   2930             if hasattr(f, "readall"):
   2931                 self.assertRaises(ValueError, f.readall)
   2932             if hasattr(f, "readinto"):
   2933                 self.assertRaises(ValueError, f.readinto, bytearray(1024))
   2934             self.assertRaises(ValueError, f.readline)
   2935             self.assertRaises(ValueError, f.readlines)
   2936             self.assertRaises(ValueError, f.seek, 0)
   2937             self.assertRaises(ValueError, f.tell)
   2938             self.assertRaises(ValueError, f.truncate)
   2939             self.assertRaises(ValueError, f.write,
   2940                               b"" if "b" in kwargs['mode'] else "")
   2941             self.assertRaises(ValueError, f.writelines, [])
   2942             self.assertRaises(ValueError, next, f)
   2943 
   2944     def test_blockingioerror(self):
   2945         # Various BlockingIOError issues
   2946         self.assertRaises(TypeError, self.BlockingIOError)
   2947         self.assertRaises(TypeError, self.BlockingIOError, 1)
   2948         self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
   2949         self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
   2950         b = self.BlockingIOError(1, "")
   2951         self.assertEqual(b.characters_written, 0)
   2952         class C(unicode):
   2953             pass
   2954         c = C("")
   2955         b = self.BlockingIOError(1, c)
   2956         c.b = b
   2957         b.c = c
   2958         wr = weakref.ref(c)
   2959         del c, b
   2960         support.gc_collect()
   2961         self.assertIsNone(wr(), wr)
   2962 
   2963     def test_abcs(self):
   2964         # Test the visible base classes are ABCs.
   2965         self.assertIsInstance(self.IOBase, abc.ABCMeta)
   2966         self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
   2967         self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
   2968         self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
   2969 
   2970     def _check_abc_inheritance(self, abcmodule):
   2971         with self.open(support.TESTFN, "wb", buffering=0) as f:
   2972             self.assertIsInstance(f, abcmodule.IOBase)
   2973             self.assertIsInstance(f, abcmodule.RawIOBase)
   2974             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
   2975             self.assertNotIsInstance(f, abcmodule.TextIOBase)
   2976         with self.open(support.TESTFN, "wb") as f:
   2977             self.assertIsInstance(f, abcmodule.IOBase)
   2978             self.assertNotIsInstance(f, abcmodule.RawIOBase)
   2979             self.assertIsInstance(f, abcmodule.BufferedIOBase)
   2980             self.assertNotIsInstance(f, abcmodule.TextIOBase)
   2981         with self.open(support.TESTFN, "w") as f:
   2982             self.assertIsInstance(f, abcmodule.IOBase)
   2983             self.assertNotIsInstance(f, abcmodule.RawIOBase)
   2984             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
   2985             self.assertIsInstance(f, abcmodule.TextIOBase)
   2986 
   2987     def test_abc_inheritance(self):
   2988         # Test implementations inherit from their respective ABCs
   2989         self._check_abc_inheritance(self)
   2990 
   2991     def test_abc_inheritance_official(self):
   2992         # Test implementations inherit from the official ABCs of the
   2993         # baseline "io" module.
   2994         self._check_abc_inheritance(io)
   2995 
   2996     @unittest.skipUnless(fcntl, 'fcntl required for this test')
   2997     def test_nonblock_pipe_write_bigbuf(self):
   2998         self._test_nonblock_pipe_write(16*1024)
   2999 
   3000     @unittest.skipUnless(fcntl, 'fcntl required for this test')
   3001     def test_nonblock_pipe_write_smallbuf(self):
   3002         self._test_nonblock_pipe_write(1024)
   3003 
   3004     def _set_non_blocking(self, fd):
   3005         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
   3006         self.assertNotEqual(flags, -1)
   3007         res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
   3008         self.assertEqual(res, 0)
   3009 
   3010     def _test_nonblock_pipe_write(self, bufsize):
   3011         sent = []
   3012         received = []
   3013         r, w = os.pipe()
   3014         self._set_non_blocking(r)
   3015         self._set_non_blocking(w)
   3016 
   3017         # To exercise all code paths in the C implementation we need
   3018         # to play with buffer sizes.  For instance, if we choose a
   3019         # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
   3020         # then we will never get a partial write of the buffer.
   3021         rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
   3022         wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
   3023 
   3024         with rf, wf:
   3025             for N in 9999, 73, 7574:
   3026                 try:
   3027                     i = 0
   3028                     while True:
   3029                         msg = bytes([i % 26 + 97]) * N
   3030                         sent.append(msg)
   3031                         wf.write(msg)
   3032                         i += 1
   3033 
   3034                 except self.BlockingIOError as e:
   3035                     self.assertEqual(e.args[0], errno.EAGAIN)
   3036                     sent[-1] = sent[-1][:e.characters_written]
   3037                     received.append(rf.read())
   3038                     msg = b'BLOCKED'
   3039                     wf.write(msg)
   3040                     sent.append(msg)
   3041 
   3042             while True:
   3043                 try:
   3044                     wf.flush()
   3045                     break
   3046                 except self.BlockingIOError as e:
   3047                     self.assertEqual(e.args[0], errno.EAGAIN)
   3048                     self.assertEqual(e.characters_written, 0)
   3049                     received.append(rf.read())
   3050 
   3051             received += iter(rf.read, None)
   3052 
   3053         sent, received = b''.join(sent), b''.join(received)
   3054         self.assertEqual(sent, received)
   3055         self.assertTrue(wf.closed)
   3056         self.assertTrue(rf.closed)
   3057 
   3058 class CMiscIOTest(MiscIOTest):
   3059     io = io
   3060     shutdown_error = "RuntimeError: could not find io module state"
   3061 
   3062 class PyMiscIOTest(MiscIOTest):
   3063     io = pyio
   3064     shutdown_error = "LookupError: unknown encoding: ascii"
   3065 
   3066 
   3067 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
   3068 class SignalsTest(unittest.TestCase):
   3069 
   3070     def setUp(self):
   3071         self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
   3072 
   3073     def tearDown(self):
   3074         signal.signal(signal.SIGALRM, self.oldalrm)
   3075 
   3076     def alarm_interrupt(self, sig, frame):
   3077         1 // 0
   3078 
   3079     @unittest.skipUnless(threading, 'Threading required for this test.')
   3080     @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
   3081                      'issue #12429: skip test on FreeBSD <= 7')
   3082     def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
   3083         """Check that a partial write, when it gets interrupted, properly
   3084         invokes the signal handler, and bubbles up the exception raised
   3085         in the latter."""
   3086         read_results = []
   3087         def _read():
   3088             s = os.read(r, 1)
   3089             read_results.append(s)
   3090         t = threading.Thread(target=_read)
   3091         t.daemon = True
   3092         r, w = os.pipe()
   3093         try:
   3094             wio = self.io.open(w, **fdopen_kwargs)
   3095             t.start()
   3096             signal.alarm(1)
   3097             # Fill the pipe enough that the write will be blocking.
   3098             # It will be interrupted by the timer armed above.  Since the
   3099             # other thread has read one byte, the low-level write will
   3100             # return with a successful (partial) result rather than an EINTR.
   3101             # The buffered IO layer must check for pending signal
   3102             # handlers, which in this case will invoke alarm_interrupt().
   3103             try:
   3104                 with self.assertRaises(ZeroDivisionError):
   3105                     wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
   3106             finally:
   3107                 t.join()
   3108             # We got one byte, get another one and check that it isn't a
   3109             # repeat of the first one.
   3110             read_results.append(os.read(r, 1))
   3111             self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
   3112         finally:
   3113             os.close(w)
   3114             os.close(r)
   3115             # This is deliberate. If we didn't close the file descriptor
   3116             # before closing wio, wio would try to flush its internal
   3117             # buffer, and block again.
   3118             try:
   3119                 wio.close()
   3120             except IOError as e:
   3121                 if e.errno != errno.EBADF:
   3122                     raise
   3123 
   3124     def test_interrupted_write_unbuffered(self):
   3125         self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
   3126 
   3127     def test_interrupted_write_buffered(self):
   3128         self.check_interrupted_write(b"xy", b"xy", mode="wb")
   3129 
   3130     def test_interrupted_write_text(self):
   3131         self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
   3132 
   3133     def check_reentrant_write(self, data, **fdopen_kwargs):
   3134         def on_alarm(*args):
   3135             # Will be called reentrantly from the same thread
   3136             wio.write(data)
   3137             1//0
   3138         signal.signal(signal.SIGALRM, on_alarm)
   3139         r, w = os.pipe()
   3140         wio = self.io.open(w, **fdopen_kwargs)
   3141         try:
   3142             signal.alarm(1)
   3143             # Either the reentrant call to wio.write() fails with RuntimeError,
   3144             # or the signal handler raises ZeroDivisionError.
   3145             with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
   3146                 while 1:
   3147                     for i in range(100):
   3148                         wio.write(data)
   3149                         wio.flush()
   3150                     # Make sure the buffer doesn't fill up and block further writes
   3151                     os.read(r, len(data) * 100)
   3152             exc = cm.exception
   3153             if isinstance(exc, RuntimeError):
   3154                 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
   3155         finally:
   3156             wio.close()
   3157             os.close(r)
   3158 
   3159     def test_reentrant_write_buffered(self):
   3160         self.check_reentrant_write(b"xy", mode="wb")
   3161 
   3162     def test_reentrant_write_text(self):
   3163         self.check_reentrant_write("xy", mode="w", encoding="ascii")
   3164 
   3165     def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
   3166         """Check that a buffered read, when it gets interrupted (either
   3167         returning a partial result or EINTR), properly invokes the signal
   3168         handler and retries if the latter returned successfully."""
   3169         r, w = os.pipe()
   3170         fdopen_kwargs["closefd"] = False
   3171         def alarm_handler(sig, frame):
   3172             os.write(w, b"bar")
   3173         signal.signal(signal.SIGALRM, alarm_handler)
   3174         try:
   3175             rio = self.io.open(r, **fdopen_kwargs)
   3176             os.write(w, b"foo")
   3177             signal.alarm(1)
   3178             # Expected behaviour:
   3179             # - first raw read() returns partial b"foo"
   3180             # - second raw read() returns EINTR
   3181             # - third raw read() returns b"bar"
   3182             self.assertEqual(decode(rio.read(6)), "foobar")
   3183         finally:
   3184             rio.close()
   3185             os.close(w)
   3186             os.close(r)
   3187 
   3188     def test_interrupterd_read_retry_buffered(self):
   3189         self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
   3190                                           mode="rb")
   3191 
   3192     def test_interrupterd_read_retry_text(self):
   3193         self.check_interrupted_read_retry(lambda x: x,
   3194                                           mode="r")
   3195 
   3196     @unittest.skipUnless(threading, 'Threading required for this test.')
   3197     def check_interrupted_write_retry(self, item, **fdopen_kwargs):
   3198         """Check that a buffered write, when it gets interrupted (either
   3199         returning a partial result or EINTR), properly invokes the signal
   3200         handler and retries if the latter returned successfully."""
   3201         select = support.import_module("select")
   3202         # A quantity that exceeds the buffer size of an anonymous pipe's
   3203         # write end.
   3204         N = support.PIPE_MAX_SIZE
   3205         r, w = os.pipe()
   3206         fdopen_kwargs["closefd"] = False
   3207         # We need a separate thread to read from the pipe and allow the
   3208         # write() to finish.  This thread is started after the SIGALRM is
   3209         # received (forcing a first EINTR in write()).
   3210         read_results = []
   3211         write_finished = False
   3212         error = [None]
   3213         def _read():
   3214             try:
   3215                 while not write_finished:
   3216                     while r in select.select([r], [], [], 1.0)[0]:
   3217                         s = os.read(r, 1024)
   3218                         read_results.append(s)
   3219             except BaseException as exc:
   3220                 error[0] = exc
   3221         t = threading.Thread(target=_read)
   3222         t.daemon = True
   3223         def alarm1(sig, frame):
   3224             signal.signal(signal.SIGALRM, alarm2)
   3225             signal.alarm(1)
   3226         def alarm2(sig, frame):
   3227             t.start()
   3228         signal.signal(signal.SIGALRM, alarm1)
   3229         try:
   3230             wio = self.io.open(w, **fdopen_kwargs)
   3231             signal.alarm(1)
   3232             # Expected behaviour:
   3233             # - first raw write() is partial (because of the limited pipe buffer
   3234             #   and the first alarm)
   3235             # - second raw write() returns EINTR (because of the second alarm)
   3236             # - subsequent write()s are successful (either partial or complete)
   3237             self.assertEqual(N, wio.write(item * N))
   3238             wio.flush()
   3239             write_finished = True
   3240             t.join()
   3241 
   3242             self.assertIsNone(error[0])
   3243             self.assertEqual(N, sum(len(x) for x in read_results))
   3244         finally:
   3245             write_finished = True
   3246             os.close(w)
   3247             os.close(r)
   3248             # This is deliberate. If we didn't close the file descriptor
   3249             # before closing wio, wio would try to flush its internal
   3250             # buffer, and could block (in case of failure).
   3251             try:
   3252                 wio.close()
   3253             except IOError as e:
   3254                 if e.errno != errno.EBADF:
   3255                     raise
   3256 
   3257     def test_interrupterd_write_retry_buffered(self):
   3258         self.check_interrupted_write_retry(b"x", mode="wb")
   3259 
   3260     def test_interrupterd_write_retry_text(self):
   3261         self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
   3262 
   3263 
   3264 class CSignalsTest(SignalsTest):
   3265     io = io
   3266 
   3267 class PySignalsTest(SignalsTest):
   3268     io = pyio
   3269 
   3270     # Handling reentrancy issues would slow down _pyio even more, so the
   3271     # tests are disabled.
   3272     test_reentrant_write_buffered = None
   3273     test_reentrant_write_text = None
   3274 
   3275 
   3276 def test_main():
   3277     tests = (CIOTest, PyIOTest,
   3278              CBufferedReaderTest, PyBufferedReaderTest,
   3279              CBufferedWriterTest, PyBufferedWriterTest,
   3280              CBufferedRWPairTest, PyBufferedRWPairTest,
   3281              CBufferedRandomTest, PyBufferedRandomTest,
   3282              StatefulIncrementalDecoderTest,
   3283              CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
   3284              CTextIOWrapperTest, PyTextIOWrapperTest,
   3285              CMiscIOTest, PyMiscIOTest,
   3286              CSignalsTest, PySignalsTest,
   3287              )
   3288 
   3289     # Put the namespaces of the IO module we are testing and some useful mock
   3290     # classes in the __dict__ of each test.
   3291     mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
   3292              MockNonBlockWriterIO, MockRawIOWithoutRead)
   3293     all_members = io.__all__ + ["IncrementalNewlineDecoder"]
   3294     c_io_ns = dict((name, getattr(io, name)) for name in all_members)
   3295     py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
   3296     globs = globals()
   3297     c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
   3298     py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
   3299     # Avoid turning open into a bound method.
   3300     py_io_ns["open"] = pyio.OpenWrapper
   3301     for test in tests:
   3302         if test.__name__.startswith("C"):
   3303             for name, obj in c_io_ns.items():
   3304                 setattr(test, name, obj)
   3305         elif test.__name__.startswith("Py"):
   3306             for name, obj in py_io_ns.items():
   3307                 setattr(test, name, obj)
   3308 
   3309     support.run_unittest(*tests)
   3310 
   3311 if __name__ == "__main__":
   3312     test_main()
   3313