Home | History | Annotate | Download | only in test
      1 import sys
      2 import os
      3 import unittest
      4 import itertools
      5 import select
      6 import signal
      7 import stat
      8 import subprocess
      9 import time
     10 from array import array
     11 from weakref import proxy
     12 try:
     13     import threading
     14 except ImportError:
     15     threading = None
     16 
     17 from test import test_support
     18 from test.test_support import TESTFN, run_unittest, requires
     19 from UserList import UserList
     20 
     21 class AutoFileTests(unittest.TestCase):
     22     # file tests for which a test file is automatically set up
     23 
     24     def setUp(self):
     25         self.f = open(TESTFN, 'wb')
     26 
     27     def tearDown(self):
     28         if self.f:
     29             self.f.close()
     30         os.remove(TESTFN)
     31 
     32     def testWeakRefs(self):
     33         # verify weak references
     34         p = proxy(self.f)
     35         p.write('teststring')
     36         self.assertEqual(self.f.tell(), p.tell())
     37         self.f.close()
     38         self.f = None
     39         self.assertRaises(ReferenceError, getattr, p, 'tell')
     40 
     41     def testAttributes(self):
     42         # verify expected attributes exist
     43         f = self.f
     44         with test_support.check_py3k_warnings():
     45             softspace = f.softspace
     46         f.name     # merely shouldn't blow up
     47         f.mode     # ditto
     48         f.closed   # ditto
     49 
     50         with test_support.check_py3k_warnings():
     51             # verify softspace is writable
     52             f.softspace = softspace    # merely shouldn't blow up
     53 
     54         # verify the others aren't
     55         for attr in 'name', 'mode', 'closed':
     56             self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
     57 
     58     def testReadinto(self):
     59         # verify readinto
     60         self.f.write('12')
     61         self.f.close()
     62         a = array('c', 'x'*10)
     63         self.f = open(TESTFN, 'rb')
     64         n = self.f.readinto(a)
     65         self.assertEqual('12', a.tostring()[:n])
     66 
     67     def testWritelinesUserList(self):
     68         # verify writelines with instance sequence
     69         l = UserList(['1', '2'])
     70         self.f.writelines(l)
     71         self.f.close()
     72         self.f = open(TESTFN, 'rb')
     73         buf = self.f.read()
     74         self.assertEqual(buf, '12')
     75 
     76     def testWritelinesIntegers(self):
     77         # verify writelines with integers
     78         self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
     79 
     80     def testWritelinesIntegersUserList(self):
     81         # verify writelines with integers in UserList
     82         l = UserList([1,2,3])
     83         self.assertRaises(TypeError, self.f.writelines, l)
     84 
     85     def testWritelinesNonString(self):
     86         # verify writelines with non-string object
     87         class NonString:
     88             pass
     89 
     90         self.assertRaises(TypeError, self.f.writelines,
     91                           [NonString(), NonString()])
     92 
     93     def testWritelinesBuffer(self):
     94         self.f.writelines([array('c', 'abc')])
     95         self.f.close()
     96         self.f = open(TESTFN, 'rb')
     97         buf = self.f.read()
     98         self.assertEqual(buf, 'abc')
     99 
    100     def testRepr(self):
    101         # verify repr works
    102         self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
    103         # see issue #14161
    104         # Windows doesn't like \r\n\t" in the file name, but ' is ok
    105         fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
    106         with open(fname, 'w') as f:
    107             self.addCleanup(os.remove, fname)
    108             self.assertTrue(repr(f).startswith(
    109                     "<open file %r, mode 'w' at" % fname))
    110 
    111     def testErrors(self):
    112         self.f.close()
    113         self.f = open(TESTFN, 'rb')
    114         f = self.f
    115         self.assertEqual(f.name, TESTFN)
    116         self.assertTrue(not f.isatty())
    117         self.assertTrue(not f.closed)
    118 
    119         self.assertRaises(TypeError, f.readinto, "")
    120         f.close()
    121         self.assertTrue(f.closed)
    122 
    123     def testMethods(self):
    124         methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
    125                    'readline', 'readlines', 'seek', 'tell', 'truncate',
    126                    'write', '__iter__']
    127         deprecated_methods = ['xreadlines']
    128         if sys.platform.startswith('atheos'):
    129             methods.remove('truncate')
    130 
    131         # __exit__ should close the file
    132         self.f.__exit__(None, None, None)
    133         self.assertTrue(self.f.closed)
    134 
    135         for methodname in methods:
    136             method = getattr(self.f, methodname)
    137             # should raise on closed file
    138             self.assertRaises(ValueError, method)
    139         with test_support.check_py3k_warnings():
    140             for methodname in deprecated_methods:
    141                 method = getattr(self.f, methodname)
    142                 self.assertRaises(ValueError, method)
    143         self.assertRaises(ValueError, self.f.writelines, [])
    144 
    145         # file is closed, __exit__ shouldn't do anything
    146         self.assertEqual(self.f.__exit__(None, None, None), None)
    147         # it must also return None if an exception was given
    148         try:
    149             1 // 0
    150         except:
    151             self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
    152 
    153     def testReadWhenWriting(self):
    154         self.assertRaises(IOError, self.f.read)
    155 
    156     def testNastyWritelinesGenerator(self):
    157         def nasty():
    158             for i in range(5):
    159                 if i == 3:
    160                     self.f.close()
    161                 yield str(i)
    162         self.assertRaises(ValueError, self.f.writelines, nasty())
    163 
    164     def testIssue5677(self):
    165         # Remark: Do not perform more than one test per open file,
    166         # since that does NOT catch the readline error on Windows.
    167         data = 'xxx'
    168         for mode in ['w', 'wb', 'a', 'ab']:
    169             for attr in ['read', 'readline', 'readlines']:
    170                 self.f = open(TESTFN, mode)
    171                 self.f.write(data)
    172                 self.assertRaises(IOError, getattr(self.f, attr))
    173                 self.f.close()
    174 
    175             self.f = open(TESTFN, mode)
    176             self.f.write(data)
    177             self.assertRaises(IOError, lambda: [line for line in self.f])
    178             self.f.close()
    179 
    180             self.f = open(TESTFN, mode)
    181             self.f.write(data)
    182             self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
    183             self.f.close()
    184 
    185         for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
    186             self.f = open(TESTFN, mode)
    187             self.assertRaises(IOError, self.f.write, data)
    188             self.f.close()
    189 
    190             self.f = open(TESTFN, mode)
    191             self.assertRaises(IOError, self.f.writelines, [data, data])
    192             self.f.close()
    193 
    194             self.f = open(TESTFN, mode)
    195             self.assertRaises(IOError, self.f.truncate)
    196             self.f.close()
    197 
    198 class OtherFileTests(unittest.TestCase):
    199 
    200     def testOpenDir(self):
    201         this_dir = os.path.dirname(__file__) or os.curdir
    202         for mode in (None, "w"):
    203             try:
    204                 if mode:
    205                     f = open(this_dir, mode)
    206                 else:
    207                     f = open(this_dir)
    208             except IOError as e:
    209                 self.assertEqual(e.filename, this_dir)
    210             else:
    211                 self.fail("opening a directory didn't raise an IOError")
    212 
    213     def testModeStrings(self):
    214         # check invalid mode strings
    215         for mode in ("", "aU", "wU+"):
    216             try:
    217                 f = open(TESTFN, mode)
    218             except ValueError:
    219                 pass
    220             else:
    221                 f.close()
    222                 self.fail('%r is an invalid file mode' % mode)
    223 
    224         # Some invalid modes fail on Windows, but pass on Unix
    225         # Issue3965: avoid a crash on Windows when filename is unicode
    226         for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
    227             try:
    228                 f = open(name, "rr")
    229             except (IOError, ValueError):
    230                 pass
    231             else:
    232                 f.close()
    233 
    234     def testStdinSeek(self):
    235         if sys.platform == 'osf1V5':
    236             # This causes the interpreter to exit on OSF1 v5.1.
    237             self.skipTest('Skipping sys.stdin.seek(-1), it may crash '
    238                           'the interpreter. Test manually.')
    239 
    240         if not sys.stdin.isatty():
    241             # Issue #23168: if stdin is redirected to a file, stdin becomes
    242             # seekable
    243             self.skipTest('stdin must be a TTY in this test')
    244 
    245         self.assertRaises(IOError, sys.stdin.seek, -1)
    246 
    247     def testStdinTruncate(self):
    248         self.assertRaises(IOError, sys.stdin.truncate)
    249 
    250     def testUnicodeOpen(self):
    251         # verify repr works for unicode too
    252         f = open(unicode(TESTFN), "w")
    253         self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
    254         f.close()
    255         os.unlink(TESTFN)
    256 
    257     def testBadModeArgument(self):
    258         # verify that we get a sensible error message for bad mode argument
    259         bad_mode = "qwerty"
    260         try:
    261             f = open(TESTFN, bad_mode)
    262         except ValueError, msg:
    263             if msg.args[0] != 0:
    264                 s = str(msg)
    265                 if TESTFN in s or bad_mode not in s:
    266                     self.fail("bad error message for invalid mode: %s" % s)
    267             # if msg.args[0] == 0, we're probably on Windows where there may
    268             # be no obvious way to discover why open() failed.
    269         else:
    270             f.close()
    271             self.fail("no error for invalid mode: %s" % bad_mode)
    272 
    273     def testSetBufferSize(self):
    274         # make sure that explicitly setting the buffer size doesn't cause
    275         # misbehaviour especially with repeated close() calls
    276         for s in (-1, 0, 1, 512):
    277             try:
    278                 f = open(TESTFN, 'w', s)
    279                 f.write(str(s))
    280                 f.close()
    281                 f.close()
    282                 f = open(TESTFN, 'r', s)
    283                 d = int(f.read())
    284                 f.close()
    285                 f.close()
    286             except IOError, msg:
    287                 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
    288             self.assertEqual(d, s)
    289 
    290     def testTruncateOnWindows(self):
    291         os.unlink(TESTFN)
    292 
    293         def bug801631():
    294             # SF bug <http://www.python.org/sf/801631>
    295             # "file.truncate fault on windows"
    296             f = open(TESTFN, 'wb')
    297             f.write('12345678901')   # 11 bytes
    298             f.close()
    299 
    300             f = open(TESTFN,'rb+')
    301             data = f.read(5)
    302             if data != '12345':
    303                 self.fail("Read on file opened for update failed %r" % data)
    304             if f.tell() != 5:
    305                 self.fail("File pos after read wrong %d" % f.tell())
    306 
    307             f.truncate()
    308             if f.tell() != 5:
    309                 self.fail("File pos after ftruncate wrong %d" % f.tell())
    310 
    311             f.close()
    312             size = os.path.getsize(TESTFN)
    313             if size != 5:
    314                 self.fail("File size after ftruncate wrong %d" % size)
    315 
    316         try:
    317             bug801631()
    318         finally:
    319             os.unlink(TESTFN)
    320 
    321     def testIteration(self):
    322         # Test the complex interaction when mixing file-iteration and the
    323         # various read* methods. Ostensibly, the mixture could just be tested
    324         # to work when it should work according to the Python language,
    325         # instead of fail when it should fail according to the current CPython
    326         # implementation.  People don't always program Python the way they
    327         # should, though, and the implemenation might change in subtle ways,
    328         # so we explicitly test for errors, too; the test will just have to
    329         # be updated when the implementation changes.
    330         dataoffset = 16384
    331         filler = "ham\n"
    332         assert not dataoffset % len(filler), \
    333             "dataoffset must be multiple of len(filler)"
    334         nchunks = dataoffset // len(filler)
    335         testlines = [
    336             "spam, spam and eggs\n",
    337             "eggs, spam, ham and spam\n",
    338             "saussages, spam, spam and eggs\n",
    339             "spam, ham, spam and eggs\n",
    340             "spam, spam, spam, spam, spam, ham, spam\n",
    341             "wonderful spaaaaaam.\n"
    342         ]
    343         methods = [("readline", ()), ("read", ()), ("readlines", ()),
    344                    ("readinto", (array("c", " "*100),))]
    345 
    346         try:
    347             # Prepare the testfile
    348             bag = open(TESTFN, "w")
    349             bag.write(filler * nchunks)
    350             bag.writelines(testlines)
    351             bag.close()
    352             # Test for appropriate errors mixing read* and iteration
    353             for methodname, args in methods:
    354                 f = open(TESTFN)
    355                 if f.next() != filler:
    356                     self.fail, "Broken testfile"
    357                 meth = getattr(f, methodname)
    358                 try:
    359                     meth(*args)
    360                 except ValueError:
    361                     pass
    362                 else:
    363                     self.fail("%s%r after next() didn't raise ValueError" %
    364                                      (methodname, args))
    365                 f.close()
    366 
    367             # Test to see if harmless (by accident) mixing of read* and
    368             # iteration still works. This depends on the size of the internal
    369             # iteration buffer (currently 8192,) but we can test it in a
    370             # flexible manner.  Each line in the bag o' ham is 4 bytes
    371             # ("h", "a", "m", "\n"), so 4096 lines of that should get us
    372             # exactly on the buffer boundary for any power-of-2 buffersize
    373             # between 4 and 16384 (inclusive).
    374             f = open(TESTFN)
    375             for i in range(nchunks):
    376                 f.next()
    377             testline = testlines.pop(0)
    378             try:
    379                 line = f.readline()
    380             except ValueError:
    381                 self.fail("readline() after next() with supposedly empty "
    382                           "iteration-buffer failed anyway")
    383             if line != testline:
    384                 self.fail("readline() after next() with empty buffer "
    385                           "failed. Got %r, expected %r" % (line, testline))
    386             testline = testlines.pop(0)
    387             buf = array("c", "\x00" * len(testline))
    388             try:
    389                 f.readinto(buf)
    390             except ValueError:
    391                 self.fail("readinto() after next() with supposedly empty "
    392                           "iteration-buffer failed anyway")
    393             line = buf.tostring()
    394             if line != testline:
    395                 self.fail("readinto() after next() with empty buffer "
    396                           "failed. Got %r, expected %r" % (line, testline))
    397 
    398             testline = testlines.pop(0)
    399             try:
    400                 line = f.read(len(testline))
    401             except ValueError:
    402                 self.fail("read() after next() with supposedly empty "
    403                           "iteration-buffer failed anyway")
    404             if line != testline:
    405                 self.fail("read() after next() with empty buffer "
    406                           "failed. Got %r, expected %r" % (line, testline))
    407             try:
    408                 lines = f.readlines()
    409             except ValueError:
    410                 self.fail("readlines() after next() with supposedly empty "
    411                           "iteration-buffer failed anyway")
    412             if lines != testlines:
    413                 self.fail("readlines() after next() with empty buffer "
    414                           "failed. Got %r, expected %r" % (line, testline))
    415             # Reading after iteration hit EOF shouldn't hurt either
    416             f = open(TESTFN)
    417             try:
    418                 for line in f:
    419                     pass
    420                 try:
    421                     f.readline()
    422                     f.readinto(buf)
    423                     f.read()
    424                     f.readlines()
    425                 except ValueError:
    426                     self.fail("read* failed after next() consumed file")
    427             finally:
    428                 f.close()
    429         finally:
    430             os.unlink(TESTFN)
    431 
    432     @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
    433     def test_write_full(self):
    434         devfull = '/dev/full'
    435         if not (os.path.exists(devfull) and
    436                 stat.S_ISCHR(os.stat(devfull).st_mode)):
    437             # Issue #21934: OpenBSD does not have a /dev/full character device
    438             self.skipTest('requires %r' % devfull)
    439         with open(devfull, 'wb', 1) as f:
    440             with self.assertRaises(IOError):
    441                 f.write('hello\n')
    442         with open(devfull, 'wb', 1) as f:
    443             with self.assertRaises(IOError):
    444                 # Issue #17976
    445                 f.write('hello')
    446                 f.write('\n')
    447         with open(devfull, 'wb', 0) as f:
    448             with self.assertRaises(IOError):
    449                 f.write('h')
    450 
    451     @unittest.skipUnless(sys.maxsize > 2**31, "requires 64-bit system")
    452     @test_support.precisionbigmemtest(2**31, 2.5, dry_run=False)
    453     def test_very_long_line(self, size):
    454         # Issue #22526
    455         requires('largefile')
    456         with open(TESTFN, "wb") as fp:
    457             fp.seek(size - 1)
    458             fp.write("\0")
    459         with open(TESTFN, "rb") as fp:
    460             for l in fp:
    461                 pass
    462         self.assertEqual(len(l), size)
    463         self.assertEqual(l.count("\0"), size)
    464         l = None
    465 
    466 class FileSubclassTests(unittest.TestCase):
    467 
    468     def testExit(self):
    469         # test that exiting with context calls subclass' close
    470         class C(file):
    471             def __init__(self, *args):
    472                 self.subclass_closed = False
    473                 file.__init__(self, *args)
    474             def close(self):
    475                 self.subclass_closed = True
    476                 file.close(self)
    477 
    478         with C(TESTFN, 'w') as f:
    479             pass
    480         self.assertTrue(f.subclass_closed)
    481 
    482 
    483 @unittest.skipUnless(threading, 'Threading required for this test.')
    484 class FileThreadingTests(unittest.TestCase):
    485     # These tests check the ability to call various methods of file objects
    486     # (including close()) concurrently without crashing the Python interpreter.
    487     # See #815646, #595601
    488 
    489     def setUp(self):
    490         self._threads = test_support.threading_setup()
    491         self.f = None
    492         self.filename = TESTFN
    493         with open(self.filename, "w") as f:
    494             f.write("\n".join("0123456789"))
    495         self._count_lock = threading.Lock()
    496         self.close_count = 0
    497         self.close_success_count = 0
    498         self.use_buffering = False
    499 
    500     def tearDown(self):
    501         if self.f:
    502             try:
    503                 self.f.close()
    504             except (EnvironmentError, ValueError):
    505                 pass
    506         try:
    507             os.remove(self.filename)
    508         except EnvironmentError:
    509             pass
    510         test_support.threading_cleanup(*self._threads)
    511 
    512     def _create_file(self):
    513         if self.use_buffering:
    514             self.f = open(self.filename, "w+", buffering=1024*16)
    515         else:
    516             self.f = open(self.filename, "w+")
    517 
    518     def _close_file(self):
    519         with self._count_lock:
    520             self.close_count += 1
    521         self.f.close()
    522         with self._count_lock:
    523             self.close_success_count += 1
    524 
    525     def _close_and_reopen_file(self):
    526         self._close_file()
    527         # if close raises an exception thats fine, self.f remains valid so
    528         # we don't need to reopen.
    529         self._create_file()
    530 
    531     def _run_workers(self, func, nb_workers, duration=0.2):
    532         with self._count_lock:
    533             self.close_count = 0
    534             self.close_success_count = 0
    535         self.do_continue = True
    536         threads = []
    537         try:
    538             for i in range(nb_workers):
    539                 t = threading.Thread(target=func)
    540                 t.start()
    541                 threads.append(t)
    542             for _ in xrange(100):
    543                 time.sleep(duration/100)
    544                 with self._count_lock:
    545                     if self.close_count-self.close_success_count > nb_workers+1:
    546                         if test_support.verbose:
    547                             print 'Q',
    548                         break
    549             time.sleep(duration)
    550         finally:
    551             self.do_continue = False
    552             for t in threads:
    553                 t.join()
    554 
    555     def _test_close_open_io(self, io_func, nb_workers=5):
    556         def worker():
    557             self._create_file()
    558             funcs = itertools.cycle((
    559                 lambda: io_func(),
    560                 lambda: self._close_and_reopen_file(),
    561             ))
    562             for f in funcs:
    563                 if not self.do_continue:
    564                     break
    565                 try:
    566                     f()
    567                 except (IOError, ValueError):
    568                     pass
    569         self._run_workers(worker, nb_workers)
    570         if test_support.verbose:
    571             # Useful verbose statistics when tuning this test to take
    572             # less time to run but still ensuring that its still useful.
    573             #
    574             # the percent of close calls that raised an error
    575             percent = 100. - 100.*self.close_success_count/self.close_count
    576             print self.close_count, ('%.4f ' % percent),
    577 
    578     def test_close_open(self):
    579         def io_func():
    580             pass
    581         self._test_close_open_io(io_func)
    582 
    583     def test_close_open_flush(self):
    584         def io_func():
    585             self.f.flush()
    586         self._test_close_open_io(io_func)
    587 
    588     def test_close_open_iter(self):
    589         def io_func():
    590             list(iter(self.f))
    591         self._test_close_open_io(io_func)
    592 
    593     def test_close_open_isatty(self):
    594         def io_func():
    595             self.f.isatty()
    596         self._test_close_open_io(io_func)
    597 
    598     def test_close_open_print(self):
    599         def io_func():
    600             print >> self.f, ''
    601         self._test_close_open_io(io_func)
    602 
    603     def test_close_open_print_buffered(self):
    604         self.use_buffering = True
    605         def io_func():
    606             print >> self.f, ''
    607         self._test_close_open_io(io_func)
    608 
    609     def test_close_open_read(self):
    610         def io_func():
    611             self.f.read(0)
    612         self._test_close_open_io(io_func)
    613 
    614     def test_close_open_readinto(self):
    615         def io_func():
    616             a = array('c', 'xxxxx')
    617             self.f.readinto(a)
    618         self._test_close_open_io(io_func)
    619 
    620     def test_close_open_readline(self):
    621         def io_func():
    622             self.f.readline()
    623         self._test_close_open_io(io_func)
    624 
    625     def test_close_open_readlines(self):
    626         def io_func():
    627             self.f.readlines()
    628         self._test_close_open_io(io_func)
    629 
    630     def test_close_open_seek(self):
    631         def io_func():
    632             self.f.seek(0, 0)
    633         self._test_close_open_io(io_func)
    634 
    635     def test_close_open_tell(self):
    636         def io_func():
    637             self.f.tell()
    638         self._test_close_open_io(io_func)
    639 
    640     def test_close_open_truncate(self):
    641         def io_func():
    642             self.f.truncate()
    643         self._test_close_open_io(io_func)
    644 
    645     def test_close_open_write(self):
    646         def io_func():
    647             self.f.write('')
    648         self._test_close_open_io(io_func)
    649 
    650     def test_close_open_writelines(self):
    651         def io_func():
    652             self.f.writelines('')
    653         self._test_close_open_io(io_func)
    654 
    655 
    656 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
    657 class TestFileSignalEINTR(unittest.TestCase):
    658     def _test_reading(self, data_to_write, read_and_verify_code, method_name,
    659                       universal_newlines=False):
    660         """Generic buffered read method test harness to verify EINTR behavior.
    661 
    662         Also validates that Python signal handlers are run during the read.
    663 
    664         Args:
    665             data_to_write: String to write to the child process for reading
    666                 before sending it a signal, confirming the signal was handled,
    667                 writing a final newline char and closing the infile pipe.
    668             read_and_verify_code: Single "line" of code to read from a file
    669                 object named 'infile' and validate the result.  This will be
    670                 executed as part of a python subprocess fed data_to_write.
    671             method_name: The name of the read method being tested, for use in
    672                 an error message on failure.
    673             universal_newlines: If True, infile will be opened in universal
    674                 newline mode in the child process.
    675         """
    676         if universal_newlines:
    677             # Test the \r\n -> \n conversion while we're at it.
    678             data_to_write = data_to_write.replace('\n', '\r\n')
    679             infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
    680         else:
    681             infile_setup_code = 'infile = sys.stdin'
    682         # Total pipe IO in this function is smaller than the minimum posix OS
    683         # pipe buffer size of 512 bytes.  No writer should block.
    684         assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
    685 
    686         child_code = (
    687              'import os, signal, sys ;'
    688              'signal.signal('
    689                      'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
    690              + infile_setup_code + ' ;' +
    691              'assert isinstance(infile, file) ;'
    692              'sys.stderr.write("Go.\\n") ;'
    693              + read_and_verify_code)
    694         reader_process = subprocess.Popen(
    695                 [sys.executable, '-c', child_code],
    696                 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
    697                 stderr=subprocess.PIPE)
    698         # Wait for the signal handler to be installed.
    699         go = reader_process.stderr.read(4)
    700         if go != 'Go.\n':
    701             reader_process.kill()
    702             self.fail('Error from %s process while awaiting "Go":\n%s' % (
    703                     method_name, go+reader_process.stderr.read()))
    704         reader_process.stdin.write(data_to_write)
    705         signals_sent = 0
    706         rlist = []
    707         # We don't know when the read_and_verify_code in our child is actually
    708         # executing within the read system call we want to interrupt.  This
    709         # loop waits for a bit before sending the first signal to increase
    710         # the likelihood of that.  Implementations without correct EINTR
    711         # and signal handling usually fail this test.
    712         while not rlist:
    713             rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
    714             reader_process.send_signal(signal.SIGINT)
    715             # Give the subprocess time to handle it before we loop around and
    716             # send another one.  On OSX the second signal happening close to
    717             # immediately after the first was causing the subprocess to crash
    718             # via the OS's default SIGINT handler.
    719             time.sleep(0.1)
    720             signals_sent += 1
    721             if signals_sent > 200:
    722                 reader_process.kill()
    723                 self.fail("failed to handle signal during %s." % method_name)
    724         # This assumes anything unexpected that writes to stderr will also
    725         # write a newline.  That is true of the traceback printing code.
    726         signal_line = reader_process.stderr.readline()
    727         if signal_line != '$\n':
    728             reader_process.kill()
    729             self.fail('Error from %s process while awaiting signal:\n%s' % (
    730                     method_name, signal_line+reader_process.stderr.read()))
    731         # We append a newline to our input so that a readline call can
    732         # end on its own before the EOF is seen.
    733         stdout, stderr = reader_process.communicate(input='\n')
    734         if reader_process.returncode != 0:
    735             self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
    736                     method_name, reader_process.returncode, stdout, stderr))
    737 
    738     def test_readline(self, universal_newlines=False):
    739         """file.readline must handle signals and not lose data."""
    740         self._test_reading(
    741                 data_to_write='hello, world!',
    742                 read_and_verify_code=(
    743                         'line = infile.readline() ;'
    744                         'expected_line = "hello, world!\\n" ;'
    745                         'assert line == expected_line, ('
    746                         '"read %r expected %r" % (line, expected_line))'
    747                 ),
    748                 method_name='readline',
    749                 universal_newlines=universal_newlines)
    750 
    751     def test_readline_with_universal_newlines(self):
    752         self.test_readline(universal_newlines=True)
    753 
    754     def test_readlines(self, universal_newlines=False):
    755         """file.readlines must handle signals and not lose data."""
    756         self._test_reading(
    757                 data_to_write='hello\nworld!',
    758                 read_and_verify_code=(
    759                         'lines = infile.readlines() ;'
    760                         'expected_lines = ["hello\\n", "world!\\n"] ;'
    761                         'assert lines == expected_lines, ('
    762                         '"readlines returned wrong data.\\n" '
    763                         '"got lines %r\\nexpected  %r" '
    764                         '% (lines, expected_lines))'
    765                 ),
    766                 method_name='readlines',
    767                 universal_newlines=universal_newlines)
    768 
    769     def test_readlines_with_universal_newlines(self):
    770         self.test_readlines(universal_newlines=True)
    771 
    772     def test_readall(self):
    773         """Unbounded file.read() must handle signals and not lose data."""
    774         self._test_reading(
    775                 data_to_write='hello, world!abcdefghijklm',
    776                 read_and_verify_code=(
    777                         'data = infile.read() ;'
    778                         'expected_data = "hello, world!abcdefghijklm\\n";'
    779                         'assert data == expected_data, ('
    780                         '"read %r expected %r" % (data, expected_data))'
    781                 ),
    782                 method_name='unbounded read')
    783 
    784     def test_readinto(self):
    785         """file.readinto must handle signals and not lose data."""
    786         self._test_reading(
    787                 data_to_write='hello, world!',
    788                 read_and_verify_code=(
    789                         'data = bytearray(50) ;'
    790                         'num_read = infile.readinto(data) ;'
    791                         'expected_data = "hello, world!\\n";'
    792                         'assert data[:num_read] == expected_data, ('
    793                         '"read %r expected %r" % (data, expected_data))'
    794                 ),
    795                 method_name='readinto')
    796 
    797 
    798 class StdoutTests(unittest.TestCase):
    799 
    800     def test_move_stdout_on_write(self):
    801         # Issue 3242: sys.stdout can be replaced (and freed) during a
    802         # print statement; prevent a segfault in this case
    803         save_stdout = sys.stdout
    804 
    805         class File:
    806             def write(self, data):
    807                 if '\n' in data:
    808                     sys.stdout = save_stdout
    809 
    810         try:
    811             sys.stdout = File()
    812             print "some text"
    813         finally:
    814             sys.stdout = save_stdout
    815 
    816     def test_del_stdout_before_print(self):
    817         # Issue 4597: 'print' with no argument wasn't reporting when
    818         # sys.stdout was deleted.
    819         save_stdout = sys.stdout
    820         del sys.stdout
    821         try:
    822             print
    823         except RuntimeError as e:
    824             self.assertEqual(str(e), "lost sys.stdout")
    825         else:
    826             self.fail("Expected RuntimeError")
    827         finally:
    828             sys.stdout = save_stdout
    829 
    830     def test_unicode(self):
    831         import subprocess
    832 
    833         def get_message(encoding, *code):
    834             code = '\n'.join(code)
    835             env = os.environ.copy()
    836             env['PYTHONIOENCODING'] = encoding
    837             process = subprocess.Popen([sys.executable, "-c", code],
    838                                        stdout=subprocess.PIPE, env=env)
    839             stdout, stderr = process.communicate()
    840             self.assertEqual(process.returncode, 0)
    841             return stdout
    842 
    843         def check_message(text, encoding, expected):
    844             stdout = get_message(encoding,
    845                 "import sys",
    846                 "sys.stdout.write(%r)" % text,
    847                 "sys.stdout.flush()")
    848             self.assertEqual(stdout, expected)
    849 
    850         # test the encoding
    851         check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
    852         check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
    853         check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
    854 
    855         # test the error handler
    856         check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
    857         check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
    858         check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
    859 
    860         # test the buffer API
    861         for objtype in ('buffer', 'bytearray'):
    862             stdout = get_message('ascii',
    863                 'import sys',
    864                 r'sys.stdout.write(%s("\xe9"))' % objtype,
    865                 'sys.stdout.flush()')
    866             self.assertEqual(stdout, "\xe9")
    867 
    868 
    869 def test_main():
    870     # Historically, these tests have been sloppy about removing TESTFN.
    871     # So get rid of it no matter what.
    872     try:
    873         run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
    874             FileThreadingTests, TestFileSignalEINTR, StdoutTests)
    875     finally:
    876         if os.path.exists(TESTFN):
    877             os.unlink(TESTFN)
    878 
    879 if __name__ == '__main__':
    880     test_main()
    881