Home | History | Annotate | Download | only in test
      1 import sys
      2 import os
      3 import unittest
      4 import itertools
      5 import select
      6 import signal
      7 import stat
      8 import subprocess
      9 import time
     10 from array import array
     11 from weakref import proxy
     12 try:
     13     import threading
     14 except ImportError:
     15     threading = None
     16 
     17 from test import test_support
     18 from test.test_support import TESTFN, run_unittest, requires
     19 from UserList import UserList
     20 
     21 class AutoFileTests(unittest.TestCase):
     22     # file tests for which a test file is automatically set up
     23 
     24     def setUp(self):
     25         self.f = open(TESTFN, 'wb')
     26 
     27     def tearDown(self):
     28         if self.f:
     29             self.f.close()
     30         os.remove(TESTFN)
     31 
     32     def testWeakRefs(self):
     33         # verify weak references
     34         p = proxy(self.f)
     35         p.write('teststring')
     36         self.assertEqual(self.f.tell(), p.tell())
     37         self.f.close()
     38         self.f = None
     39         self.assertRaises(ReferenceError, getattr, p, 'tell')
     40 
     41     def testAttributes(self):
     42         # verify expected attributes exist
     43         f = self.f
     44         with test_support.check_py3k_warnings():
     45             softspace = f.softspace
     46         f.name     # merely shouldn't blow up
     47         f.mode     # ditto
     48         f.closed   # ditto
     49 
     50         with test_support.check_py3k_warnings():
     51             # verify softspace is writable
     52             f.softspace = softspace    # merely shouldn't blow up
     53 
     54         # verify the others aren't
     55         for attr in 'name', 'mode', 'closed':
     56             self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
     57 
     58     def testReadinto(self):
     59         # verify readinto
     60         self.f.write('12')
     61         self.f.close()
     62         a = array('c', 'x'*10)
     63         self.f = open(TESTFN, 'rb')
     64         n = self.f.readinto(a)
     65         self.assertEqual('12', a.tostring()[:n])
     66 
     67     def testWritelinesUserList(self):
     68         # verify writelines with instance sequence
     69         l = UserList(['1', '2'])
     70         self.f.writelines(l)
     71         self.f.close()
     72         self.f = open(TESTFN, 'rb')
     73         buf = self.f.read()
     74         self.assertEqual(buf, '12')
     75 
     76     def testWritelinesIntegers(self):
     77         # verify writelines with integers
     78         self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
     79 
     80     def testWritelinesIntegersUserList(self):
     81         # verify writelines with integers in UserList
     82         l = UserList([1,2,3])
     83         self.assertRaises(TypeError, self.f.writelines, l)
     84 
     85     def testWritelinesNonString(self):
     86         # verify writelines with non-string object
     87         class NonString:
     88             pass
     89 
     90         self.assertRaises(TypeError, self.f.writelines,
     91                           [NonString(), NonString()])
     92 
     93     def testWritelinesBuffer(self):
     94         self.f.writelines([array('c', 'abc')])
     95         self.f.close()
     96         self.f = open(TESTFN, 'rb')
     97         buf = self.f.read()
     98         self.assertEqual(buf, 'abc')
     99 
    100     def testRepr(self):
    101         # verify repr works
    102         self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
    103         # see issue #14161
    104         # Windows doesn't like \r\n\t" in the file name, but ' is ok
    105         fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
    106         with open(fname, 'w') as f:
    107             self.addCleanup(os.remove, fname)
    108             self.assertTrue(repr(f).startswith(
    109                     "<open file %r, mode 'w' at" % fname))
    110 
    111     def testErrors(self):
    112         self.f.close()
    113         self.f = open(TESTFN, 'rb')
    114         f = self.f
    115         self.assertEqual(f.name, TESTFN)
    116         self.assertTrue(not f.isatty())
    117         self.assertTrue(not f.closed)
    118 
    119         self.assertRaises(TypeError, f.readinto, "")
    120         f.close()
    121         self.assertTrue(f.closed)
    122 
    123     def testMethods(self):
    124         methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
    125                    'readline', 'readlines', 'seek', 'tell', 'truncate',
    126                    'write', '__iter__']
    127         deprecated_methods = ['xreadlines']
    128         if sys.platform.startswith('atheos'):
    129             methods.remove('truncate')
    130 
    131         # __exit__ should close the file
    132         self.f.__exit__(None, None, None)
    133         self.assertTrue(self.f.closed)
    134 
    135         for methodname in methods:
    136             method = getattr(self.f, methodname)
    137             # should raise on closed file
    138             self.assertRaises(ValueError, method)
    139         with test_support.check_py3k_warnings():
    140             for methodname in deprecated_methods:
    141                 method = getattr(self.f, methodname)
    142                 self.assertRaises(ValueError, method)
    143         self.assertRaises(ValueError, self.f.writelines, [])
    144 
    145         # file is closed, __exit__ shouldn't do anything
    146         self.assertEqual(self.f.__exit__(None, None, None), None)
    147         # it must also return None if an exception was given
    148         try:
    149             1 // 0
    150         except:
    151             self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
    152 
    153     def testReadWhenWriting(self):
    154         self.assertRaises(IOError, self.f.read)
    155 
    156     def testNastyWritelinesGenerator(self):
    157         def nasty():
    158             for i in range(5):
    159                 if i == 3:
    160                     self.f.close()
    161                 yield str(i)
    162         self.assertRaises(ValueError, self.f.writelines, nasty())
    163 
    164     def testIssue5677(self):
    165         # Remark: Do not perform more than one test per open file,
    166         # since that does NOT catch the readline error on Windows.
    167         data = 'xxx'
    168         for mode in ['w', 'wb', 'a', 'ab']:
    169             for attr in ['read', 'readline', 'readlines']:
    170                 self.f = open(TESTFN, mode)
    171                 self.f.write(data)
    172                 self.assertRaises(IOError, getattr(self.f, attr))
    173                 self.f.close()
    174 
    175             self.f = open(TESTFN, mode)
    176             self.f.write(data)
    177             self.assertRaises(IOError, lambda: [line for line in self.f])
    178             self.f.close()
    179 
    180             self.f = open(TESTFN, mode)
    181             self.f.write(data)
    182             self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
    183             self.f.close()
    184 
    185         for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
    186             self.f = open(TESTFN, mode)
    187             self.assertRaises(IOError, self.f.write, data)
    188             self.f.close()
    189 
    190             self.f = open(TESTFN, mode)
    191             self.assertRaises(IOError, self.f.writelines, [data, data])
    192             self.f.close()
    193 
    194             self.f = open(TESTFN, mode)
    195             self.assertRaises(IOError, self.f.truncate)
    196             self.f.close()
    197 
    198 class OtherFileTests(unittest.TestCase):
    199 
    200     def testOpenDir(self):
    201         this_dir = os.path.dirname(__file__) or os.curdir
    202         for mode in (None, "w"):
    203             try:
    204                 if mode:
    205                     f = open(this_dir, mode)
    206                 else:
    207                     f = open(this_dir)
    208             except IOError as e:
    209                 self.assertEqual(e.filename, this_dir)
    210             else:
    211                 self.fail("opening a directory didn't raise an IOError")
    212 
    213     def testModeStrings(self):
    214         # check invalid mode strings
    215         for mode in ("", "aU", "wU+"):
    216             try:
    217                 f = open(TESTFN, mode)
    218             except ValueError:
    219                 pass
    220             else:
    221                 f.close()
    222                 self.fail('%r is an invalid file mode' % mode)
    223 
    224         # Some invalid modes fail on Windows, but pass on Unix
    225         # Issue3965: avoid a crash on Windows when filename is unicode
    226         for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
    227             try:
    228                 f = open(name, "rr")
    229             except (IOError, ValueError):
    230                 pass
    231             else:
    232                 f.close()
    233 
    234     def testStdinSeek(self):
    235         if sys.platform == 'osf1V5':
    236             # This causes the interpreter to exit on OSF1 v5.1.
    237             self.skipTest('Skipping sys.stdin.seek(-1), it may crash '
    238                           'the interpreter. Test manually.')
    239 
    240         if not sys.stdin.isatty():
    241             # Issue #23168: if stdin is redirected to a file, stdin becomes
    242             # seekable
    243             self.skipTest('stdin must be a TTY in this test')
    244 
    245         self.assertRaises(IOError, sys.stdin.seek, -1)
    246 
    247     def testStdinTruncate(self):
    248         self.assertRaises(IOError, sys.stdin.truncate)
    249 
    250     def testUnicodeOpen(self):
    251         # verify repr works for unicode too
    252         f = open(unicode(TESTFN), "w")
    253         self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
    254         f.close()
    255         os.unlink(TESTFN)
    256 
    257     def testBadModeArgument(self):
    258         # verify that we get a sensible error message for bad mode argument
    259         bad_mode = "qwerty"
    260         try:
    261             f = open(TESTFN, bad_mode)
    262         except ValueError, msg:
    263             if msg.args[0] != 0:
    264                 s = str(msg)
    265                 if TESTFN in s or bad_mode not in s:
    266                     self.fail("bad error message for invalid mode: %s" % s)
    267             # if msg.args[0] == 0, we're probably on Windows where there may
    268             # be no obvious way to discover why open() failed.
    269         else:
    270             f.close()
    271             self.fail("no error for invalid mode: %s" % bad_mode)
    272 
    273     def testSetBufferSize(self):
    274         # make sure that explicitly setting the buffer size doesn't cause
    275         # misbehaviour especially with repeated close() calls
    276         for s in (-1, 0, 1, 512):
    277             try:
    278                 f = open(TESTFN, 'w', s)
    279                 f.write(str(s))
    280                 f.close()
    281                 f.close()
    282                 f = open(TESTFN, 'r', s)
    283                 d = int(f.read())
    284                 f.close()
    285                 f.close()
    286             except IOError, msg:
    287                 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
    288             self.assertEqual(d, s)
    289 
    290     def testTruncateOnWindows(self):
    291         os.unlink(TESTFN)
    292 
    293         def bug801631():
    294             # SF bug <http://www.python.org/sf/801631>
    295             # "file.truncate fault on windows"
    296             f = open(TESTFN, 'wb')
    297             f.write('12345678901')   # 11 bytes
    298             f.close()
    299 
    300             f = open(TESTFN,'rb+')
    301             data = f.read(5)
    302             if data != '12345':
    303                 self.fail("Read on file opened for update failed %r" % data)
    304             if f.tell() != 5:
    305                 self.fail("File pos after read wrong %d" % f.tell())
    306 
    307             f.truncate()
    308             if f.tell() != 5:
    309                 self.fail("File pos after ftruncate wrong %d" % f.tell())
    310 
    311             f.close()
    312             size = os.path.getsize(TESTFN)
    313             if size != 5:
    314                 self.fail("File size after ftruncate wrong %d" % size)
    315 
    316         try:
    317             bug801631()
    318         finally:
    319             os.unlink(TESTFN)
    320 
    321     def testIteration(self):
    322         # Test the complex interaction when mixing file-iteration and the
    323         # various read* methods. Ostensibly, the mixture could just be tested
    324         # to work when it should work according to the Python language,
    325         # instead of fail when it should fail according to the current CPython
    326         # implementation.  People don't always program Python the way they
    327         # should, though, and the implemenation might change in subtle ways,
    328         # so we explicitly test for errors, too; the test will just have to
    329         # be updated when the implementation changes.
    330         dataoffset = 16384
    331         filler = "ham\n"
    332         assert not dataoffset % len(filler), \
    333             "dataoffset must be multiple of len(filler)"
    334         nchunks = dataoffset // len(filler)
    335         testlines = [
    336             "spam, spam and eggs\n",
    337             "eggs, spam, ham and spam\n",
    338             "saussages, spam, spam and eggs\n",
    339             "spam, ham, spam and eggs\n",
    340             "spam, spam, spam, spam, spam, ham, spam\n",
    341             "wonderful spaaaaaam.\n"
    342         ]
    343         methods = [("readline", ()), ("read", ()), ("readlines", ()),
    344                    ("readinto", (array("c", " "*100),))]
    345 
    346         try:
    347             # Prepare the testfile
    348             bag = open(TESTFN, "w")
    349             bag.write(filler * nchunks)
    350             bag.writelines(testlines)
    351             bag.close()
    352             # Test for appropriate errors mixing read* and iteration
    353             for methodname, args in methods:
    354                 f = open(TESTFN)
    355                 if f.next() != filler:
    356                     self.fail, "Broken testfile"
    357                 meth = getattr(f, methodname)
    358                 try:
    359                     meth(*args)
    360                 except ValueError:
    361                     pass
    362                 else:
    363                     self.fail("%s%r after next() didn't raise ValueError" %
    364                                      (methodname, args))
    365                 f.close()
    366 
    367             # Test to see if harmless (by accident) mixing of read* and
    368             # iteration still works. This depends on the size of the internal
    369             # iteration buffer (currently 8192,) but we can test it in a
    370             # flexible manner.  Each line in the bag o' ham is 4 bytes
    371             # ("h", "a", "m", "\n"), so 4096 lines of that should get us
    372             # exactly on the buffer boundary for any power-of-2 buffersize
    373             # between 4 and 16384 (inclusive).
    374             f = open(TESTFN)
    375             for i in range(nchunks):
    376                 f.next()
    377             testline = testlines.pop(0)
    378             try:
    379                 line = f.readline()
    380             except ValueError:
    381                 self.fail("readline() after next() with supposedly empty "
    382                           "iteration-buffer failed anyway")
    383             if line != testline:
    384                 self.fail("readline() after next() with empty buffer "
    385                           "failed. Got %r, expected %r" % (line, testline))
    386             testline = testlines.pop(0)
    387             buf = array("c", "\x00" * len(testline))
    388             try:
    389                 f.readinto(buf)
    390             except ValueError:
    391                 self.fail("readinto() after next() with supposedly empty "
    392                           "iteration-buffer failed anyway")
    393             line = buf.tostring()
    394             if line != testline:
    395                 self.fail("readinto() after next() with empty buffer "
    396                           "failed. Got %r, expected %r" % (line, testline))
    397 
    398             testline = testlines.pop(0)
    399             try:
    400                 line = f.read(len(testline))
    401             except ValueError:
    402                 self.fail("read() after next() with supposedly empty "
    403                           "iteration-buffer failed anyway")
    404             if line != testline:
    405                 self.fail("read() after next() with empty buffer "
    406                           "failed. Got %r, expected %r" % (line, testline))
    407             try:
    408                 lines = f.readlines()
    409             except ValueError:
    410                 self.fail("readlines() after next() with supposedly empty "
    411                           "iteration-buffer failed anyway")
    412             if lines != testlines:
    413                 self.fail("readlines() after next() with empty buffer "
    414                           "failed. Got %r, expected %r" % (line, testline))
    415             # Reading after iteration hit EOF shouldn't hurt either
    416             f = open(TESTFN)
    417             try:
    418                 for line in f:
    419                     pass
    420                 try:
    421                     f.readline()
    422                     f.readinto(buf)
    423                     f.read()
    424                     f.readlines()
    425                 except ValueError:
    426                     self.fail("read* failed after next() consumed file")
    427             finally:
    428                 f.close()
    429         finally:
    430             os.unlink(TESTFN)
    431 
    432     @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
    433     def test_write_full(self):
    434         devfull = '/dev/full'
    435         if not (os.path.exists(devfull) and
    436                 stat.S_ISCHR(os.stat(devfull).st_mode)):
    437             # Issue #21934: OpenBSD does not have a /dev/full character device
    438             self.skipTest('requires %r' % devfull)
    439         with open(devfull, 'wb', 1) as f:
    440             with self.assertRaises(IOError):
    441                 f.write('hello\n')
    442         with open(devfull, 'wb', 1) as f:
    443             with self.assertRaises(IOError):
    444                 # Issue #17976
    445                 f.write('hello')
    446                 f.write('\n')
    447         with open(devfull, 'wb', 0) as f:
    448             with self.assertRaises(IOError):
    449                 f.write('h')
    450 
    451     @unittest.skipUnless(sys.maxsize > 2**31, "requires 64-bit system")
    452     @test_support.precisionbigmemtest(2**31, 2.5, dry_run=False)
    453     def test_very_long_line(self, size):
    454         # Issue #22526
    455         requires('largefile')
    456         with open(TESTFN, "wb") as fp:
    457             fp.seek(size - 1)
    458             fp.write("\0")
    459         with open(TESTFN, "rb") as fp:
    460             for l in fp:
    461                 pass
    462         self.assertEqual(len(l), size)
    463         self.assertEqual(l.count("\0"), size)
    464         l = None
    465 
    466 class FileSubclassTests(unittest.TestCase):
    467 
    468     def testExit(self):
    469         # test that exiting with context calls subclass' close
    470         class C(file):
    471             def __init__(self, *args):
    472                 self.subclass_closed = False
    473                 file.__init__(self, *args)
    474             def close(self):
    475                 self.subclass_closed = True
    476                 file.close(self)
    477 
    478         with C(TESTFN, 'w') as f:
    479             pass
    480         self.assertTrue(f.subclass_closed)
    481 
    482 
    483 @unittest.skipUnless(threading, 'Threading required for this test.')
    484 class FileThreadingTests(unittest.TestCase):
    485     # These tests check the ability to call various methods of file objects
    486     # (including close()) concurrently without crashing the Python interpreter.
    487     # See #815646, #595601
    488 
    489     def setUp(self):
    490         self._threads = test_support.threading_setup()
    491         self.f = None
    492         self.filename = TESTFN
    493         with open(self.filename, "w") as f:
    494             f.write("\n".join("0123456789"))
    495         self._count_lock = threading.Lock()
    496         self.close_count = 0
    497         self.close_success_count = 0
    498         self.use_buffering = False
    499 
    500     def tearDown(self):
    501         if self.f:
    502             try:
    503                 self.f.close()
    504             except (EnvironmentError, ValueError):
    505                 pass
    506         try:
    507             os.remove(self.filename)
    508         except EnvironmentError:
    509             pass
    510         test_support.threading_cleanup(*self._threads)
    511 
    512     def _create_file(self):
    513         if self.use_buffering:
    514             self.f = open(self.filename, "w+", buffering=1024*16)
    515         else:
    516             self.f = open(self.filename, "w+")
    517 
    518     def _close_file(self):
    519         with self._count_lock:
    520             self.close_count += 1
    521         self.f.close()
    522         with self._count_lock:
    523             self.close_success_count += 1
    524 
    525     def _close_and_reopen_file(self):
    526         self._close_file()
    527         # if close raises an exception thats fine, self.f remains valid so
    528         # we don't need to reopen.
    529         self._create_file()
    530 
    531     def _run_workers(self, func, nb_workers, duration=0.2):
    532         with self._count_lock:
    533             self.close_count = 0
    534             self.close_success_count = 0
    535         self.do_continue = True
    536         threads = []
    537         try:
    538             for i in range(nb_workers):
    539                 t = threading.Thread(target=func)
    540                 t.start()
    541                 threads.append(t)
    542             for _ in xrange(100):
    543                 time.sleep(duration/100)
    544                 with self._count_lock:
    545                     if self.close_count-self.close_success_count > nb_workers+1:
    546                         if test_support.verbose:
    547                             print 'Q',
    548                         break
    549             time.sleep(duration)
    550         finally:
    551             self.do_continue = False
    552             for t in threads:
    553                 t.join()
    554 
    555     def _test_close_open_io(self, io_func, nb_workers=5):
    556         def worker():
    557             self._create_file()
    558             funcs = itertools.cycle((
    559                 lambda: io_func(),
    560                 lambda: self._close_and_reopen_file(),
    561             ))
    562             for f in funcs:
    563                 if not self.do_continue:
    564                     break
    565                 try:
    566                     f()
    567                 except (IOError, ValueError):
    568                     pass
    569         self._run_workers(worker, nb_workers)
    570         if test_support.verbose:
    571             # Useful verbose statistics when tuning this test to take
    572             # less time to run but still ensuring that its still useful.
    573             #
    574             # the percent of close calls that raised an error
    575             percent = 100. - 100.*self.close_success_count/self.close_count
    576             print self.close_count, ('%.4f ' % percent),
    577 
    578     def test_close_open(self):
    579         def io_func():
    580             pass
    581         self._test_close_open_io(io_func)
    582 
    583     def test_close_open_flush(self):
    584         def io_func():
    585             self.f.flush()
    586         self._test_close_open_io(io_func)
    587 
    588     def test_close_open_iter(self):
    589         def io_func():
    590             list(iter(self.f))
    591         self._test_close_open_io(io_func)
    592 
    593     def test_close_open_isatty(self):
    594         def io_func():
    595             self.f.isatty()
    596         self._test_close_open_io(io_func)
    597 
    598     def test_close_open_print(self):
    599         def io_func():
    600             print >> self.f, ''
    601         self._test_close_open_io(io_func)
    602 
    603     def test_close_open_print_buffered(self):
    604         self.use_buffering = True
    605         def io_func():
    606             print >> self.f, ''
    607         self._test_close_open_io(io_func)
    608 
    609     def test_close_open_read(self):
    610         def io_func():
    611             self.f.read(0)
    612         self._test_close_open_io(io_func)
    613 
    614     def test_close_open_readinto(self):
    615         def io_func():
    616             a = array('c', 'xxxxx')
    617             self.f.readinto(a)
    618         self._test_close_open_io(io_func)
    619 
    620     def test_close_open_readline(self):
    621         def io_func():
    622             self.f.readline()
    623         self._test_close_open_io(io_func)
    624 
    625     def test_close_open_readlines(self):
    626         def io_func():
    627             self.f.readlines()
    628         self._test_close_open_io(io_func)
    629 
    630     def test_close_open_seek(self):
    631         def io_func():
    632             self.f.seek(0, 0)
    633         self._test_close_open_io(io_func)
    634 
    635     def test_close_open_tell(self):
    636         def io_func():
    637             self.f.tell()
    638         self._test_close_open_io(io_func)
    639 
    640     def test_close_open_truncate(self):
    641         def io_func():
    642             self.f.truncate()
    643         self._test_close_open_io(io_func)
    644 
    645     def test_close_open_write(self):
    646         def io_func():
    647             self.f.write('')
    648         self._test_close_open_io(io_func)
    649 
    650     def test_close_open_writelines(self):
    651         def io_func():
    652             self.f.writelines('')
    653         self._test_close_open_io(io_func)
    654 
    655     def test_iteration_torture(self):
    656         # bpo-31530
    657         with open(self.filename, "wb") as fp:
    658             for i in xrange(2**20):
    659                 fp.write(b"0"*50 + b"\n")
    660         with open(self.filename, "rb") as f:
    661             def it():
    662                 for l in f:
    663                     pass
    664             self._run_workers(it, 10)
    665 
    666     def test_iteration_seek(self):
    667         # bpo-31530: Crash when concurrently seek and iterate over a file.
    668         with open(self.filename, "wb") as fp:
    669             for i in xrange(10000):
    670                 fp.write(b"0"*50 + b"\n")
    671         with open(self.filename, "rb") as f:
    672             it = iter([1] + [0]*10)  # one thread reads, others seek
    673             def iterate():
    674                 if next(it):
    675                     for l in f:
    676                         pass
    677                 else:
    678                     for i in xrange(100):
    679                         f.seek(i*100, 0)
    680             self._run_workers(iterate, 10)
    681 
    682 
    683 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
    684 class TestFileSignalEINTR(unittest.TestCase):
    685     def _test_reading(self, data_to_write, read_and_verify_code, method_name,
    686                       universal_newlines=False):
    687         """Generic buffered read method test harness to verify EINTR behavior.
    688 
    689         Also validates that Python signal handlers are run during the read.
    690 
    691         Args:
    692             data_to_write: String to write to the child process for reading
    693                 before sending it a signal, confirming the signal was handled,
    694                 writing a final newline char and closing the infile pipe.
    695             read_and_verify_code: Single "line" of code to read from a file
    696                 object named 'infile' and validate the result.  This will be
    697                 executed as part of a python subprocess fed data_to_write.
    698             method_name: The name of the read method being tested, for use in
    699                 an error message on failure.
    700             universal_newlines: If True, infile will be opened in universal
    701                 newline mode in the child process.
    702         """
    703         if universal_newlines:
    704             # Test the \r\n -> \n conversion while we're at it.
    705             data_to_write = data_to_write.replace('\n', '\r\n')
    706             infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
    707         else:
    708             infile_setup_code = 'infile = sys.stdin'
    709         # Total pipe IO in this function is smaller than the minimum posix OS
    710         # pipe buffer size of 512 bytes.  No writer should block.
    711         assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
    712 
    713         child_code = (
    714              'import os, signal, sys ;'
    715              'signal.signal('
    716                      'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
    717              + infile_setup_code + ' ;' +
    718              'assert isinstance(infile, file) ;'
    719              'sys.stderr.write("Go.\\n") ;'
    720              + read_and_verify_code)
    721         reader_process = subprocess.Popen(
    722                 [sys.executable, '-c', child_code],
    723                 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
    724                 stderr=subprocess.PIPE)
    725         # Wait for the signal handler to be installed.
    726         go = reader_process.stderr.read(4)
    727         if go != 'Go.\n':
    728             reader_process.kill()
    729             self.fail('Error from %s process while awaiting "Go":\n%s' % (
    730                     method_name, go+reader_process.stderr.read()))
    731         reader_process.stdin.write(data_to_write)
    732         signals_sent = 0
    733         rlist = []
    734         # We don't know when the read_and_verify_code in our child is actually
    735         # executing within the read system call we want to interrupt.  This
    736         # loop waits for a bit before sending the first signal to increase
    737         # the likelihood of that.  Implementations without correct EINTR
    738         # and signal handling usually fail this test.
    739         while not rlist:
    740             rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
    741             reader_process.send_signal(signal.SIGINT)
    742             # Give the subprocess time to handle it before we loop around and
    743             # send another one.  On OSX the second signal happening close to
    744             # immediately after the first was causing the subprocess to crash
    745             # via the OS's default SIGINT handler.
    746             time.sleep(0.1)
    747             signals_sent += 1
    748             if signals_sent > 200:
    749                 reader_process.kill()
    750                 self.fail("failed to handle signal during %s." % method_name)
    751         # This assumes anything unexpected that writes to stderr will also
    752         # write a newline.  That is true of the traceback printing code.
    753         signal_line = reader_process.stderr.readline()
    754         if signal_line != '$\n':
    755             reader_process.kill()
    756             self.fail('Error from %s process while awaiting signal:\n%s' % (
    757                     method_name, signal_line+reader_process.stderr.read()))
    758         # We append a newline to our input so that a readline call can
    759         # end on its own before the EOF is seen.
    760         stdout, stderr = reader_process.communicate(input='\n')
    761         if reader_process.returncode != 0:
    762             self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
    763                     method_name, reader_process.returncode, stdout, stderr))
    764 
    765     def test_readline(self, universal_newlines=False):
    766         """file.readline must handle signals and not lose data."""
    767         self._test_reading(
    768                 data_to_write='hello, world!',
    769                 read_and_verify_code=(
    770                         'line = infile.readline() ;'
    771                         'expected_line = "hello, world!\\n" ;'
    772                         'assert line == expected_line, ('
    773                         '"read %r expected %r" % (line, expected_line))'
    774                 ),
    775                 method_name='readline',
    776                 universal_newlines=universal_newlines)
    777 
    778     def test_readline_with_universal_newlines(self):
    779         self.test_readline(universal_newlines=True)
    780 
    781     def test_readlines(self, universal_newlines=False):
    782         """file.readlines must handle signals and not lose data."""
    783         self._test_reading(
    784                 data_to_write='hello\nworld!',
    785                 read_and_verify_code=(
    786                         'lines = infile.readlines() ;'
    787                         'expected_lines = ["hello\\n", "world!\\n"] ;'
    788                         'assert lines == expected_lines, ('
    789                         '"readlines returned wrong data.\\n" '
    790                         '"got lines %r\\nexpected  %r" '
    791                         '% (lines, expected_lines))'
    792                 ),
    793                 method_name='readlines',
    794                 universal_newlines=universal_newlines)
    795 
    796     def test_readlines_with_universal_newlines(self):
    797         self.test_readlines(universal_newlines=True)
    798 
    799     def test_readall(self):
    800         """Unbounded file.read() must handle signals and not lose data."""
    801         self._test_reading(
    802                 data_to_write='hello, world!abcdefghijklm',
    803                 read_and_verify_code=(
    804                         'data = infile.read() ;'
    805                         'expected_data = "hello, world!abcdefghijklm\\n";'
    806                         'assert data == expected_data, ('
    807                         '"read %r expected %r" % (data, expected_data))'
    808                 ),
    809                 method_name='unbounded read')
    810 
    811     def test_readinto(self):
    812         """file.readinto must handle signals and not lose data."""
    813         self._test_reading(
    814                 data_to_write='hello, world!',
    815                 read_and_verify_code=(
    816                         'data = bytearray(50) ;'
    817                         'num_read = infile.readinto(data) ;'
    818                         'expected_data = "hello, world!\\n";'
    819                         'assert data[:num_read] == expected_data, ('
    820                         '"read %r expected %r" % (data, expected_data))'
    821                 ),
    822                 method_name='readinto')
    823 
    824 
    825 class StdoutTests(unittest.TestCase):
    826 
    827     def test_move_stdout_on_write(self):
    828         # Issue 3242: sys.stdout can be replaced (and freed) during a
    829         # print statement; prevent a segfault in this case
    830         save_stdout = sys.stdout
    831 
    832         class File:
    833             def write(self, data):
    834                 if '\n' in data:
    835                     sys.stdout = save_stdout
    836 
    837         try:
    838             sys.stdout = File()
    839             print "some text"
    840         finally:
    841             sys.stdout = save_stdout
    842 
    843     def test_del_stdout_before_print(self):
    844         # Issue 4597: 'print' with no argument wasn't reporting when
    845         # sys.stdout was deleted.
    846         save_stdout = sys.stdout
    847         del sys.stdout
    848         try:
    849             print
    850         except RuntimeError as e:
    851             self.assertEqual(str(e), "lost sys.stdout")
    852         else:
    853             self.fail("Expected RuntimeError")
    854         finally:
    855             sys.stdout = save_stdout
    856 
    857     def test_unicode(self):
    858         import subprocess
    859 
    860         def get_message(encoding, *code):
    861             code = '\n'.join(code)
    862             env = os.environ.copy()
    863             env['PYTHONIOENCODING'] = encoding
    864             process = subprocess.Popen([sys.executable, "-c", code],
    865                                        stdout=subprocess.PIPE, env=env)
    866             stdout, stderr = process.communicate()
    867             self.assertEqual(process.returncode, 0)
    868             return stdout
    869 
    870         def check_message(text, encoding, expected):
    871             stdout = get_message(encoding,
    872                 "import sys",
    873                 "sys.stdout.write(%r)" % text,
    874                 "sys.stdout.flush()")
    875             self.assertEqual(stdout, expected)
    876 
    877         # test the encoding
    878         check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
    879         check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
    880         check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
    881 
    882         # test the error handler
    883         check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
    884         check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
    885         check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
    886 
    887         # test the buffer API
    888         for objtype in ('buffer', 'bytearray'):
    889             stdout = get_message('ascii',
    890                 'import sys',
    891                 r'sys.stdout.write(%s("\xe9"))' % objtype,
    892                 'sys.stdout.flush()')
    893             self.assertEqual(stdout, "\xe9")
    894 
    895 
    896 def test_main():
    897     # Historically, these tests have been sloppy about removing TESTFN.
    898     # So get rid of it no matter what.
    899     try:
    900         run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
    901             FileThreadingTests, TestFileSignalEINTR, StdoutTests)
    902     finally:
    903         if os.path.exists(TESTFN):
    904             os.unlink(TESTFN)
    905 
    906 if __name__ == '__main__':
    907     test_main()
    908