Home | History | Annotate | Download | only in test
      1 import sys
      2 import os
      3 import unittest
      4 import itertools
      5 import select
      6 import signal
      7 import subprocess
      8 import time
      9 from array import array
     10 from weakref import proxy
     11 try:
     12     import threading
     13 except ImportError:
     14     threading = None
     15 
     16 from test import test_support
     17 from test.test_support import TESTFN, run_unittest
     18 from UserList import UserList
     19 
     20 class AutoFileTests(unittest.TestCase):
     21     # file tests for which a test file is automatically set up
     22 
     23     def setUp(self):
     24         self.f = open(TESTFN, 'wb')
     25 
     26     def tearDown(self):
     27         if self.f:
     28             self.f.close()
     29         os.remove(TESTFN)
     30 
     31     def testWeakRefs(self):
     32         # verify weak references
     33         p = proxy(self.f)
     34         p.write('teststring')
     35         self.assertEqual(self.f.tell(), p.tell())
     36         self.f.close()
     37         self.f = None
     38         self.assertRaises(ReferenceError, getattr, p, 'tell')
     39 
     40     def testAttributes(self):
     41         # verify expected attributes exist
     42         f = self.f
     43         with test_support.check_py3k_warnings():
     44             softspace = f.softspace
     45         f.name     # merely shouldn't blow up
     46         f.mode     # ditto
     47         f.closed   # ditto
     48 
     49         with test_support.check_py3k_warnings():
     50             # verify softspace is writable
     51             f.softspace = softspace    # merely shouldn't blow up
     52 
     53         # verify the others aren't
     54         for attr in 'name', 'mode', 'closed':
     55             self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
     56 
     57     def testReadinto(self):
     58         # verify readinto
     59         self.f.write('12')
     60         self.f.close()
     61         a = array('c', 'x'*10)
     62         self.f = open(TESTFN, 'rb')
     63         n = self.f.readinto(a)
     64         self.assertEqual('12', a.tostring()[:n])
     65 
     66     def testWritelinesUserList(self):
     67         # verify writelines with instance sequence
     68         l = UserList(['1', '2'])
     69         self.f.writelines(l)
     70         self.f.close()
     71         self.f = open(TESTFN, 'rb')
     72         buf = self.f.read()
     73         self.assertEqual(buf, '12')
     74 
     75     def testWritelinesIntegers(self):
     76         # verify writelines with integers
     77         self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
     78 
     79     def testWritelinesIntegersUserList(self):
     80         # verify writelines with integers in UserList
     81         l = UserList([1,2,3])
     82         self.assertRaises(TypeError, self.f.writelines, l)
     83 
     84     def testWritelinesNonString(self):
     85         # verify writelines with non-string object
     86         class NonString:
     87             pass
     88 
     89         self.assertRaises(TypeError, self.f.writelines,
     90                           [NonString(), NonString()])
     91 
     92     def testRepr(self):
     93         # verify repr works
     94         self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
     95         # see issue #14161
     96         # Windows doesn't like \r\n\t" in the file name, but ' is ok
     97         fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
     98         with open(fname, 'w') as f:
     99             self.addCleanup(os.remove, fname)
    100             self.assertTrue(repr(f).startswith(
    101                     "<open file %r, mode 'w' at" % fname))
    102 
    103     def testErrors(self):
    104         self.f.close()
    105         self.f = open(TESTFN, 'rb')
    106         f = self.f
    107         self.assertEqual(f.name, TESTFN)
    108         self.assertTrue(not f.isatty())
    109         self.assertTrue(not f.closed)
    110 
    111         self.assertRaises(TypeError, f.readinto, "")
    112         f.close()
    113         self.assertTrue(f.closed)
    114 
    115     def testMethods(self):
    116         methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
    117                    'readline', 'readlines', 'seek', 'tell', 'truncate',
    118                    'write', '__iter__']
    119         deprecated_methods = ['xreadlines']
    120         if sys.platform.startswith('atheos'):
    121             methods.remove('truncate')
    122 
    123         # __exit__ should close the file
    124         self.f.__exit__(None, None, None)
    125         self.assertTrue(self.f.closed)
    126 
    127         for methodname in methods:
    128             method = getattr(self.f, methodname)
    129             # should raise on closed file
    130             self.assertRaises(ValueError, method)
    131         with test_support.check_py3k_warnings():
    132             for methodname in deprecated_methods:
    133                 method = getattr(self.f, methodname)
    134                 self.assertRaises(ValueError, method)
    135         self.assertRaises(ValueError, self.f.writelines, [])
    136 
    137         # file is closed, __exit__ shouldn't do anything
    138         self.assertEqual(self.f.__exit__(None, None, None), None)
    139         # it must also return None if an exception was given
    140         try:
    141             1 // 0
    142         except:
    143             self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
    144 
    145     def testReadWhenWriting(self):
    146         self.assertRaises(IOError, self.f.read)
    147 
    148     def testNastyWritelinesGenerator(self):
    149         def nasty():
    150             for i in range(5):
    151                 if i == 3:
    152                     self.f.close()
    153                 yield str(i)
    154         self.assertRaises(ValueError, self.f.writelines, nasty())
    155 
    156     def testIssue5677(self):
    157         # Remark: Do not perform more than one test per open file,
    158         # since that does NOT catch the readline error on Windows.
    159         data = 'xxx'
    160         for mode in ['w', 'wb', 'a', 'ab']:
    161             for attr in ['read', 'readline', 'readlines']:
    162                 self.f = open(TESTFN, mode)
    163                 self.f.write(data)
    164                 self.assertRaises(IOError, getattr(self.f, attr))
    165                 self.f.close()
    166 
    167             self.f = open(TESTFN, mode)
    168             self.f.write(data)
    169             self.assertRaises(IOError, lambda: [line for line in self.f])
    170             self.f.close()
    171 
    172             self.f = open(TESTFN, mode)
    173             self.f.write(data)
    174             self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
    175             self.f.close()
    176 
    177         for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
    178             self.f = open(TESTFN, mode)
    179             self.assertRaises(IOError, self.f.write, data)
    180             self.f.close()
    181 
    182             self.f = open(TESTFN, mode)
    183             self.assertRaises(IOError, self.f.writelines, [data, data])
    184             self.f.close()
    185 
    186             self.f = open(TESTFN, mode)
    187             self.assertRaises(IOError, self.f.truncate)
    188             self.f.close()
    189 
    190 class OtherFileTests(unittest.TestCase):
    191 
    192     def testOpenDir(self):
    193         this_dir = os.path.dirname(__file__) or os.curdir
    194         for mode in (None, "w"):
    195             try:
    196                 if mode:
    197                     f = open(this_dir, mode)
    198                 else:
    199                     f = open(this_dir)
    200             except IOError as e:
    201                 self.assertEqual(e.filename, this_dir)
    202             else:
    203                 self.fail("opening a directory didn't raise an IOError")
    204 
    205     def testModeStrings(self):
    206         # check invalid mode strings
    207         for mode in ("", "aU", "wU+"):
    208             try:
    209                 f = open(TESTFN, mode)
    210             except ValueError:
    211                 pass
    212             else:
    213                 f.close()
    214                 self.fail('%r is an invalid file mode' % mode)
    215 
    216         # Some invalid modes fail on Windows, but pass on Unix
    217         # Issue3965: avoid a crash on Windows when filename is unicode
    218         for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
    219             try:
    220                 f = open(name, "rr")
    221             except (IOError, ValueError):
    222                 pass
    223             else:
    224                 f.close()
    225 
    226     def testStdin(self):
    227         # This causes the interpreter to exit on OSF1 v5.1.
    228         if sys.platform != 'osf1V5':
    229             self.assertRaises(IOError, sys.stdin.seek, -1)
    230         else:
    231             print >>sys.__stdout__, (
    232                 '  Skipping sys.stdin.seek(-1), it may crash the interpreter.'
    233                 ' Test manually.')
    234         self.assertRaises(IOError, sys.stdin.truncate)
    235 
    236     def testUnicodeOpen(self):
    237         # verify repr works for unicode too
    238         f = open(unicode(TESTFN), "w")
    239         self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
    240         f.close()
    241         os.unlink(TESTFN)
    242 
    243     def testBadModeArgument(self):
    244         # verify that we get a sensible error message for bad mode argument
    245         bad_mode = "qwerty"
    246         try:
    247             f = open(TESTFN, bad_mode)
    248         except ValueError, msg:
    249             if msg.args[0] != 0:
    250                 s = str(msg)
    251                 if TESTFN in s or bad_mode not in s:
    252                     self.fail("bad error message for invalid mode: %s" % s)
    253             # if msg.args[0] == 0, we're probably on Windows where there may
    254             # be no obvious way to discover why open() failed.
    255         else:
    256             f.close()
    257             self.fail("no error for invalid mode: %s" % bad_mode)
    258 
    259     def testSetBufferSize(self):
    260         # make sure that explicitly setting the buffer size doesn't cause
    261         # misbehaviour especially with repeated close() calls
    262         for s in (-1, 0, 1, 512):
    263             try:
    264                 f = open(TESTFN, 'w', s)
    265                 f.write(str(s))
    266                 f.close()
    267                 f.close()
    268                 f = open(TESTFN, 'r', s)
    269                 d = int(f.read())
    270                 f.close()
    271                 f.close()
    272             except IOError, msg:
    273                 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
    274             self.assertEqual(d, s)
    275 
    276     def testTruncateOnWindows(self):
    277         os.unlink(TESTFN)
    278 
    279         def bug801631():
    280             # SF bug <http://www.python.org/sf/801631>
    281             # "file.truncate fault on windows"
    282             f = open(TESTFN, 'wb')
    283             f.write('12345678901')   # 11 bytes
    284             f.close()
    285 
    286             f = open(TESTFN,'rb+')
    287             data = f.read(5)
    288             if data != '12345':
    289                 self.fail("Read on file opened for update failed %r" % data)
    290             if f.tell() != 5:
    291                 self.fail("File pos after read wrong %d" % f.tell())
    292 
    293             f.truncate()
    294             if f.tell() != 5:
    295                 self.fail("File pos after ftruncate wrong %d" % f.tell())
    296 
    297             f.close()
    298             size = os.path.getsize(TESTFN)
    299             if size != 5:
    300                 self.fail("File size after ftruncate wrong %d" % size)
    301 
    302         try:
    303             bug801631()
    304         finally:
    305             os.unlink(TESTFN)
    306 
    307     def testIteration(self):
    308         # Test the complex interaction when mixing file-iteration and the
    309         # various read* methods. Ostensibly, the mixture could just be tested
    310         # to work when it should work according to the Python language,
    311         # instead of fail when it should fail according to the current CPython
    312         # implementation.  People don't always program Python the way they
    313         # should, though, and the implemenation might change in subtle ways,
    314         # so we explicitly test for errors, too; the test will just have to
    315         # be updated when the implementation changes.
    316         dataoffset = 16384
    317         filler = "ham\n"
    318         assert not dataoffset % len(filler), \
    319             "dataoffset must be multiple of len(filler)"
    320         nchunks = dataoffset // len(filler)
    321         testlines = [
    322             "spam, spam and eggs\n",
    323             "eggs, spam, ham and spam\n",
    324             "saussages, spam, spam and eggs\n",
    325             "spam, ham, spam and eggs\n",
    326             "spam, spam, spam, spam, spam, ham, spam\n",
    327             "wonderful spaaaaaam.\n"
    328         ]
    329         methods = [("readline", ()), ("read", ()), ("readlines", ()),
    330                    ("readinto", (array("c", " "*100),))]
    331 
    332         try:
    333             # Prepare the testfile
    334             bag = open(TESTFN, "w")
    335             bag.write(filler * nchunks)
    336             bag.writelines(testlines)
    337             bag.close()
    338             # Test for appropriate errors mixing read* and iteration
    339             for methodname, args in methods:
    340                 f = open(TESTFN)
    341                 if f.next() != filler:
    342                     self.fail, "Broken testfile"
    343                 meth = getattr(f, methodname)
    344                 try:
    345                     meth(*args)
    346                 except ValueError:
    347                     pass
    348                 else:
    349                     self.fail("%s%r after next() didn't raise ValueError" %
    350                                      (methodname, args))
    351                 f.close()
    352 
    353             # Test to see if harmless (by accident) mixing of read* and
    354             # iteration still works. This depends on the size of the internal
    355             # iteration buffer (currently 8192,) but we can test it in a
    356             # flexible manner.  Each line in the bag o' ham is 4 bytes
    357             # ("h", "a", "m", "\n"), so 4096 lines of that should get us
    358             # exactly on the buffer boundary for any power-of-2 buffersize
    359             # between 4 and 16384 (inclusive).
    360             f = open(TESTFN)
    361             for i in range(nchunks):
    362                 f.next()
    363             testline = testlines.pop(0)
    364             try:
    365                 line = f.readline()
    366             except ValueError:
    367                 self.fail("readline() after next() with supposedly empty "
    368                           "iteration-buffer failed anyway")
    369             if line != testline:
    370                 self.fail("readline() after next() with empty buffer "
    371                           "failed. Got %r, expected %r" % (line, testline))
    372             testline = testlines.pop(0)
    373             buf = array("c", "\x00" * len(testline))
    374             try:
    375                 f.readinto(buf)
    376             except ValueError:
    377                 self.fail("readinto() after next() with supposedly empty "
    378                           "iteration-buffer failed anyway")
    379             line = buf.tostring()
    380             if line != testline:
    381                 self.fail("readinto() after next() with empty buffer "
    382                           "failed. Got %r, expected %r" % (line, testline))
    383 
    384             testline = testlines.pop(0)
    385             try:
    386                 line = f.read(len(testline))
    387             except ValueError:
    388                 self.fail("read() after next() with supposedly empty "
    389                           "iteration-buffer failed anyway")
    390             if line != testline:
    391                 self.fail("read() after next() with empty buffer "
    392                           "failed. Got %r, expected %r" % (line, testline))
    393             try:
    394                 lines = f.readlines()
    395             except ValueError:
    396                 self.fail("readlines() after next() with supposedly empty "
    397                           "iteration-buffer failed anyway")
    398             if lines != testlines:
    399                 self.fail("readlines() after next() with empty buffer "
    400                           "failed. Got %r, expected %r" % (line, testline))
    401             # Reading after iteration hit EOF shouldn't hurt either
    402             f = open(TESTFN)
    403             try:
    404                 for line in f:
    405                     pass
    406                 try:
    407                     f.readline()
    408                     f.readinto(buf)
    409                     f.read()
    410                     f.readlines()
    411                 except ValueError:
    412                     self.fail("read* failed after next() consumed file")
    413             finally:
    414                 f.close()
    415         finally:
    416             os.unlink(TESTFN)
    417 
    418 class FileSubclassTests(unittest.TestCase):
    419 
    420     def testExit(self):
    421         # test that exiting with context calls subclass' close
    422         class C(file):
    423             def __init__(self, *args):
    424                 self.subclass_closed = False
    425                 file.__init__(self, *args)
    426             def close(self):
    427                 self.subclass_closed = True
    428                 file.close(self)
    429 
    430         with C(TESTFN, 'w') as f:
    431             pass
    432         self.assertTrue(f.subclass_closed)
    433 
    434 
    435 @unittest.skipUnless(threading, 'Threading required for this test.')
    436 class FileThreadingTests(unittest.TestCase):
    437     # These tests check the ability to call various methods of file objects
    438     # (including close()) concurrently without crashing the Python interpreter.
    439     # See #815646, #595601
    440 
    441     def setUp(self):
    442         self._threads = test_support.threading_setup()
    443         self.f = None
    444         self.filename = TESTFN
    445         with open(self.filename, "w") as f:
    446             f.write("\n".join("0123456789"))
    447         self._count_lock = threading.Lock()
    448         self.close_count = 0
    449         self.close_success_count = 0
    450         self.use_buffering = False
    451 
    452     def tearDown(self):
    453         if self.f:
    454             try:
    455                 self.f.close()
    456             except (EnvironmentError, ValueError):
    457                 pass
    458         try:
    459             os.remove(self.filename)
    460         except EnvironmentError:
    461             pass
    462         test_support.threading_cleanup(*self._threads)
    463 
    464     def _create_file(self):
    465         if self.use_buffering:
    466             self.f = open(self.filename, "w+", buffering=1024*16)
    467         else:
    468             self.f = open(self.filename, "w+")
    469 
    470     def _close_file(self):
    471         with self._count_lock:
    472             self.close_count += 1
    473         self.f.close()
    474         with self._count_lock:
    475             self.close_success_count += 1
    476 
    477     def _close_and_reopen_file(self):
    478         self._close_file()
    479         # if close raises an exception thats fine, self.f remains valid so
    480         # we don't need to reopen.
    481         self._create_file()
    482 
    483     def _run_workers(self, func, nb_workers, duration=0.2):
    484         with self._count_lock:
    485             self.close_count = 0
    486             self.close_success_count = 0
    487         self.do_continue = True
    488         threads = []
    489         try:
    490             for i in range(nb_workers):
    491                 t = threading.Thread(target=func)
    492                 t.start()
    493                 threads.append(t)
    494             for _ in xrange(100):
    495                 time.sleep(duration/100)
    496                 with self._count_lock:
    497                     if self.close_count-self.close_success_count > nb_workers+1:
    498                         if test_support.verbose:
    499                             print 'Q',
    500                         break
    501             time.sleep(duration)
    502         finally:
    503             self.do_continue = False
    504             for t in threads:
    505                 t.join()
    506 
    507     def _test_close_open_io(self, io_func, nb_workers=5):
    508         def worker():
    509             self._create_file()
    510             funcs = itertools.cycle((
    511                 lambda: io_func(),
    512                 lambda: self._close_and_reopen_file(),
    513             ))
    514             for f in funcs:
    515                 if not self.do_continue:
    516                     break
    517                 try:
    518                     f()
    519                 except (IOError, ValueError):
    520                     pass
    521         self._run_workers(worker, nb_workers)
    522         if test_support.verbose:
    523             # Useful verbose statistics when tuning this test to take
    524             # less time to run but still ensuring that its still useful.
    525             #
    526             # the percent of close calls that raised an error
    527             percent = 100. - 100.*self.close_success_count/self.close_count
    528             print self.close_count, ('%.4f ' % percent),
    529 
    530     def test_close_open(self):
    531         def io_func():
    532             pass
    533         self._test_close_open_io(io_func)
    534 
    535     def test_close_open_flush(self):
    536         def io_func():
    537             self.f.flush()
    538         self._test_close_open_io(io_func)
    539 
    540     def test_close_open_iter(self):
    541         def io_func():
    542             list(iter(self.f))
    543         self._test_close_open_io(io_func)
    544 
    545     def test_close_open_isatty(self):
    546         def io_func():
    547             self.f.isatty()
    548         self._test_close_open_io(io_func)
    549 
    550     def test_close_open_print(self):
    551         def io_func():
    552             print >> self.f, ''
    553         self._test_close_open_io(io_func)
    554 
    555     def test_close_open_print_buffered(self):
    556         self.use_buffering = True
    557         def io_func():
    558             print >> self.f, ''
    559         self._test_close_open_io(io_func)
    560 
    561     def test_close_open_read(self):
    562         def io_func():
    563             self.f.read(0)
    564         self._test_close_open_io(io_func)
    565 
    566     def test_close_open_readinto(self):
    567         def io_func():
    568             a = array('c', 'xxxxx')
    569             self.f.readinto(a)
    570         self._test_close_open_io(io_func)
    571 
    572     def test_close_open_readline(self):
    573         def io_func():
    574             self.f.readline()
    575         self._test_close_open_io(io_func)
    576 
    577     def test_close_open_readlines(self):
    578         def io_func():
    579             self.f.readlines()
    580         self._test_close_open_io(io_func)
    581 
    582     def test_close_open_seek(self):
    583         def io_func():
    584             self.f.seek(0, 0)
    585         self._test_close_open_io(io_func)
    586 
    587     def test_close_open_tell(self):
    588         def io_func():
    589             self.f.tell()
    590         self._test_close_open_io(io_func)
    591 
    592     def test_close_open_truncate(self):
    593         def io_func():
    594             self.f.truncate()
    595         self._test_close_open_io(io_func)
    596 
    597     def test_close_open_write(self):
    598         def io_func():
    599             self.f.write('')
    600         self._test_close_open_io(io_func)
    601 
    602     def test_close_open_writelines(self):
    603         def io_func():
    604             self.f.writelines('')
    605         self._test_close_open_io(io_func)
    606 
    607 
    608 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
    609 class TestFileSignalEINTR(unittest.TestCase):
    610     def _test_reading(self, data_to_write, read_and_verify_code, method_name,
    611                       universal_newlines=False):
    612         """Generic buffered read method test harness to verify EINTR behavior.
    613 
    614         Also validates that Python signal handlers are run during the read.
    615 
    616         Args:
    617             data_to_write: String to write to the child process for reading
    618                 before sending it a signal, confirming the signal was handled,
    619                 writing a final newline char and closing the infile pipe.
    620             read_and_verify_code: Single "line" of code to read from a file
    621                 object named 'infile' and validate the result.  This will be
    622                 executed as part of a python subprocess fed data_to_write.
    623             method_name: The name of the read method being tested, for use in
    624                 an error message on failure.
    625             universal_newlines: If True, infile will be opened in universal
    626                 newline mode in the child process.
    627         """
    628         if universal_newlines:
    629             # Test the \r\n -> \n conversion while we're at it.
    630             data_to_write = data_to_write.replace('\n', '\r\n')
    631             infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
    632         else:
    633             infile_setup_code = 'infile = sys.stdin'
    634         # Total pipe IO in this function is smaller than the minimum posix OS
    635         # pipe buffer size of 512 bytes.  No writer should block.
    636         assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
    637 
    638         child_code = (
    639              'import os, signal, sys ;'
    640              'signal.signal('
    641                      'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
    642              + infile_setup_code + ' ;' +
    643              'assert isinstance(infile, file) ;'
    644              'sys.stderr.write("Go.\\n") ;'
    645              + read_and_verify_code)
    646         reader_process = subprocess.Popen(
    647                 [sys.executable, '-c', child_code],
    648                 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
    649                 stderr=subprocess.PIPE)
    650         # Wait for the signal handler to be installed.
    651         go = reader_process.stderr.read(4)
    652         if go != 'Go.\n':
    653             reader_process.kill()
    654             self.fail('Error from %s process while awaiting "Go":\n%s' % (
    655                     method_name, go+reader_process.stderr.read()))
    656         reader_process.stdin.write(data_to_write)
    657         signals_sent = 0
    658         rlist = []
    659         # We don't know when the read_and_verify_code in our child is actually
    660         # executing within the read system call we want to interrupt.  This
    661         # loop waits for a bit before sending the first signal to increase
    662         # the likelihood of that.  Implementations without correct EINTR
    663         # and signal handling usually fail this test.
    664         while not rlist:
    665             rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
    666             reader_process.send_signal(signal.SIGINT)
    667             # Give the subprocess time to handle it before we loop around and
    668             # send another one.  On OSX the second signal happening close to
    669             # immediately after the first was causing the subprocess to crash
    670             # via the OS's default SIGINT handler.
    671             time.sleep(0.1)
    672             signals_sent += 1
    673             if signals_sent > 200:
    674                 reader_process.kill()
    675                 self.fail("failed to handle signal during %s." % method_name)
    676         # This assumes anything unexpected that writes to stderr will also
    677         # write a newline.  That is true of the traceback printing code.
    678         signal_line = reader_process.stderr.readline()
    679         if signal_line != '$\n':
    680             reader_process.kill()
    681             self.fail('Error from %s process while awaiting signal:\n%s' % (
    682                     method_name, signal_line+reader_process.stderr.read()))
    683         # We append a newline to our input so that a readline call can
    684         # end on its own before the EOF is seen.
    685         stdout, stderr = reader_process.communicate(input='\n')
    686         if reader_process.returncode != 0:
    687             self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
    688                     method_name, reader_process.returncode, stdout, stderr))
    689 
    690     def test_readline(self, universal_newlines=False):
    691         """file.readline must handle signals and not lose data."""
    692         self._test_reading(
    693                 data_to_write='hello, world!',
    694                 read_and_verify_code=(
    695                         'line = infile.readline() ;'
    696                         'expected_line = "hello, world!\\n" ;'
    697                         'assert line == expected_line, ('
    698                         '"read %r expected %r" % (line, expected_line))'
    699                 ),
    700                 method_name='readline',
    701                 universal_newlines=universal_newlines)
    702 
    703     def test_readline_with_universal_newlines(self):
    704         self.test_readline(universal_newlines=True)
    705 
    706     def test_readlines(self, universal_newlines=False):
    707         """file.readlines must handle signals and not lose data."""
    708         self._test_reading(
    709                 data_to_write='hello\nworld!',
    710                 read_and_verify_code=(
    711                         'lines = infile.readlines() ;'
    712                         'expected_lines = ["hello\\n", "world!\\n"] ;'
    713                         'assert lines == expected_lines, ('
    714                         '"readlines returned wrong data.\\n" '
    715                         '"got lines %r\\nexpected  %r" '
    716                         '% (lines, expected_lines))'
    717                 ),
    718                 method_name='readlines',
    719                 universal_newlines=universal_newlines)
    720 
    721     def test_readlines_with_universal_newlines(self):
    722         self.test_readlines(universal_newlines=True)
    723 
    724     def test_readall(self):
    725         """Unbounded file.read() must handle signals and not lose data."""
    726         self._test_reading(
    727                 data_to_write='hello, world!abcdefghijklm',
    728                 read_and_verify_code=(
    729                         'data = infile.read() ;'
    730                         'expected_data = "hello, world!abcdefghijklm\\n";'
    731                         'assert data == expected_data, ('
    732                         '"read %r expected %r" % (data, expected_data))'
    733                 ),
    734                 method_name='unbounded read')
    735 
    736     def test_readinto(self):
    737         """file.readinto must handle signals and not lose data."""
    738         self._test_reading(
    739                 data_to_write='hello, world!',
    740                 read_and_verify_code=(
    741                         'data = bytearray(50) ;'
    742                         'num_read = infile.readinto(data) ;'
    743                         'expected_data = "hello, world!\\n";'
    744                         'assert data[:num_read] == expected_data, ('
    745                         '"read %r expected %r" % (data, expected_data))'
    746                 ),
    747                 method_name='readinto')
    748 
    749 
    750 class StdoutTests(unittest.TestCase):
    751 
    752     def test_move_stdout_on_write(self):
    753         # Issue 3242: sys.stdout can be replaced (and freed) during a
    754         # print statement; prevent a segfault in this case
    755         save_stdout = sys.stdout
    756 
    757         class File:
    758             def write(self, data):
    759                 if '\n' in data:
    760                     sys.stdout = save_stdout
    761 
    762         try:
    763             sys.stdout = File()
    764             print "some text"
    765         finally:
    766             sys.stdout = save_stdout
    767 
    768     def test_del_stdout_before_print(self):
    769         # Issue 4597: 'print' with no argument wasn't reporting when
    770         # sys.stdout was deleted.
    771         save_stdout = sys.stdout
    772         del sys.stdout
    773         try:
    774             print
    775         except RuntimeError as e:
    776             self.assertEqual(str(e), "lost sys.stdout")
    777         else:
    778             self.fail("Expected RuntimeError")
    779         finally:
    780             sys.stdout = save_stdout
    781 
    782     def test_unicode(self):
    783         import subprocess
    784 
    785         def get_message(encoding, *code):
    786             code = '\n'.join(code)
    787             env = os.environ.copy()
    788             env['PYTHONIOENCODING'] = encoding
    789             process = subprocess.Popen([sys.executable, "-c", code],
    790                                        stdout=subprocess.PIPE, env=env)
    791             stdout, stderr = process.communicate()
    792             self.assertEqual(process.returncode, 0)
    793             return stdout
    794 
    795         def check_message(text, encoding, expected):
    796             stdout = get_message(encoding,
    797                 "import sys",
    798                 "sys.stdout.write(%r)" % text,
    799                 "sys.stdout.flush()")
    800             self.assertEqual(stdout, expected)
    801 
    802         # test the encoding
    803         check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
    804         check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
    805         check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
    806 
    807         # test the error handler
    808         check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
    809         check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
    810         check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
    811 
    812         # test the buffer API
    813         for objtype in ('buffer', 'bytearray'):
    814             stdout = get_message('ascii',
    815                 'import sys',
    816                 r'sys.stdout.write(%s("\xe9"))' % objtype,
    817                 'sys.stdout.flush()')
    818             self.assertEqual(stdout, "\xe9")
    819 
    820 
    821 def test_main():
    822     # Historically, these tests have been sloppy about removing TESTFN.
    823     # So get rid of it no matter what.
    824     try:
    825         run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
    826             FileThreadingTests, TestFileSignalEINTR, StdoutTests)
    827     finally:
    828         if os.path.exists(TESTFN):
    829             os.unlink(TESTFN)
    830 
    831 if __name__ == '__main__':
    832     test_main()
    833