Home | History | Annotate | Download | only in test
      1 import sys
      2 import os
      3 import unittest
      4 import itertools
      5 import time
      6 from array import array
      7 from weakref import proxy
      8 try:
      9     import threading
     10 except ImportError:
     11     threading = None
     12 
     13 from test import test_support
     14 from test.test_support import TESTFN, run_unittest
     15 from UserList import UserList
     16 
     17 class AutoFileTests(unittest.TestCase):
     18     # file tests for which a test file is automatically set up

     19 
     20     def setUp(self):
     21         self.f = open(TESTFN, 'wb')
     22 
     23     def tearDown(self):
     24         if self.f:
     25             self.f.close()
     26         os.remove(TESTFN)
     27 
     28     def testWeakRefs(self):
     29         # verify weak references

     30         p = proxy(self.f)
     31         p.write('teststring')
     32         self.assertEqual(self.f.tell(), p.tell())
     33         self.f.close()
     34         self.f = None
     35         self.assertRaises(ReferenceError, getattr, p, 'tell')
     36 
     37     def testAttributes(self):
     38         # verify expected attributes exist

     39         f = self.f
     40         with test_support.check_py3k_warnings():
     41             softspace = f.softspace
     42         f.name     # merely shouldn't blow up

     43         f.mode     # ditto

     44         f.closed   # ditto

     45 
     46         with test_support.check_py3k_warnings():
     47             # verify softspace is writable

     48             f.softspace = softspace    # merely shouldn't blow up

     49 
     50         # verify the others aren't

     51         for attr in 'name', 'mode', 'closed':
     52             self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
     53 
     54     def testReadinto(self):
     55         # verify readinto

     56         self.f.write('12')
     57         self.f.close()
     58         a = array('c', 'x'*10)
     59         self.f = open(TESTFN, 'rb')
     60         n = self.f.readinto(a)
     61         self.assertEqual('12', a.tostring()[:n])
     62 
     63     def testWritelinesUserList(self):
     64         # verify writelines with instance sequence

     65         l = UserList(['1', '2'])
     66         self.f.writelines(l)
     67         self.f.close()
     68         self.f = open(TESTFN, 'rb')
     69         buf = self.f.read()
     70         self.assertEqual(buf, '12')
     71 
     72     def testWritelinesIntegers(self):
     73         # verify writelines with integers

     74         self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
     75 
     76     def testWritelinesIntegersUserList(self):
     77         # verify writelines with integers in UserList

     78         l = UserList([1,2,3])
     79         self.assertRaises(TypeError, self.f.writelines, l)
     80 
     81     def testWritelinesNonString(self):
     82         # verify writelines with non-string object

     83         class NonString:
     84             pass
     85 
     86         self.assertRaises(TypeError, self.f.writelines,
     87                           [NonString(), NonString()])
     88 
     89     def testRepr(self):
     90         # verify repr works

     91         self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
     92 
     93     def testErrors(self):
     94         self.f.close()
     95         self.f = open(TESTFN, 'rb')
     96         f = self.f
     97         self.assertEqual(f.name, TESTFN)
     98         self.assertTrue(not f.isatty())
     99         self.assertTrue(not f.closed)
    100 
    101         self.assertRaises(TypeError, f.readinto, "")
    102         f.close()
    103         self.assertTrue(f.closed)
    104 
    105     def testMethods(self):
    106         methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
    107                    'readline', 'readlines', 'seek', 'tell', 'truncate',
    108                    'write', '__iter__']
    109         deprecated_methods = ['xreadlines']
    110         if sys.platform.startswith('atheos'):
    111             methods.remove('truncate')
    112 
    113         # __exit__ should close the file

    114         self.f.__exit__(None, None, None)
    115         self.assertTrue(self.f.closed)
    116 
    117         for methodname in methods:
    118             method = getattr(self.f, methodname)
    119             # should raise on closed file

    120             self.assertRaises(ValueError, method)
    121         with test_support.check_py3k_warnings():
    122             for methodname in deprecated_methods:
    123                 method = getattr(self.f, methodname)
    124                 self.assertRaises(ValueError, method)
    125         self.assertRaises(ValueError, self.f.writelines, [])
    126 
    127         # file is closed, __exit__ shouldn't do anything

    128         self.assertEqual(self.f.__exit__(None, None, None), None)
    129         # it must also return None if an exception was given

    130         try:
    131             1 // 0
    132         except:
    133             self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
    134 
    135     def testReadWhenWriting(self):
    136         self.assertRaises(IOError, self.f.read)
    137 
    138     def testNastyWritelinesGenerator(self):
    139         def nasty():
    140             for i in range(5):
    141                 if i == 3:
    142                     self.f.close()
    143                 yield str(i)
    144         self.assertRaises(ValueError, self.f.writelines, nasty())
    145 
    146     def testIssue5677(self):
    147         # Remark: Do not perform more than one test per open file,

    148         # since that does NOT catch the readline error on Windows.

    149         data = 'xxx'
    150         for mode in ['w', 'wb', 'a', 'ab']:
    151             for attr in ['read', 'readline', 'readlines']:
    152                 self.f = open(TESTFN, mode)
    153                 self.f.write(data)
    154                 self.assertRaises(IOError, getattr(self.f, attr))
    155                 self.f.close()
    156 
    157             self.f = open(TESTFN, mode)
    158             self.f.write(data)
    159             self.assertRaises(IOError, lambda: [line for line in self.f])
    160             self.f.close()
    161 
    162             self.f = open(TESTFN, mode)
    163             self.f.write(data)
    164             self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
    165             self.f.close()
    166 
    167         for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
    168             self.f = open(TESTFN, mode)
    169             self.assertRaises(IOError, self.f.write, data)
    170             self.f.close()
    171 
    172             self.f = open(TESTFN, mode)
    173             self.assertRaises(IOError, self.f.writelines, [data, data])
    174             self.f.close()
    175 
    176             self.f = open(TESTFN, mode)
    177             self.assertRaises(IOError, self.f.truncate)
    178             self.f.close()
    179 
    180 class OtherFileTests(unittest.TestCase):
    181 
    182     def testOpenDir(self):
    183         this_dir = os.path.dirname(__file__) or os.curdir
    184         for mode in (None, "w"):
    185             try:
    186                 if mode:
    187                     f = open(this_dir, mode)
    188                 else:
    189                     f = open(this_dir)
    190             except IOError as e:
    191                 self.assertEqual(e.filename, this_dir)
    192             else:
    193                 self.fail("opening a directory didn't raise an IOError")
    194 
    195     def testModeStrings(self):
    196         # check invalid mode strings

    197         for mode in ("", "aU", "wU+"):
    198             try:
    199                 f = open(TESTFN, mode)
    200             except ValueError:
    201                 pass
    202             else:
    203                 f.close()
    204                 self.fail('%r is an invalid file mode' % mode)
    205 
    206         # Some invalid modes fail on Windows, but pass on Unix

    207         # Issue3965: avoid a crash on Windows when filename is unicode

    208         for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
    209             try:
    210                 f = open(name, "rr")
    211             except (IOError, ValueError):
    212                 pass
    213             else:
    214                 f.close()
    215 
    216     def testStdin(self):
    217         # This causes the interpreter to exit on OSF1 v5.1.

    218         if sys.platform != 'osf1V5':
    219             self.assertRaises(IOError, sys.stdin.seek, -1)
    220         else:
    221             print >>sys.__stdout__, (
    222                 '  Skipping sys.stdin.seek(-1), it may crash the interpreter.'
    223                 ' Test manually.')
    224         self.assertRaises(IOError, sys.stdin.truncate)
    225 
    226     def testUnicodeOpen(self):
    227         # verify repr works for unicode too

    228         f = open(unicode(TESTFN), "w")
    229         self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
    230         f.close()
    231         os.unlink(TESTFN)
    232 
    233     def testBadModeArgument(self):
    234         # verify that we get a sensible error message for bad mode argument

    235         bad_mode = "qwerty"
    236         try:
    237             f = open(TESTFN, bad_mode)
    238         except ValueError, msg:
    239             if msg.args[0] != 0:
    240                 s = str(msg)
    241                 if TESTFN in s or bad_mode not in s:
    242                     self.fail("bad error message for invalid mode: %s" % s)
    243             # if msg.args[0] == 0, we're probably on Windows where there may

    244             # be no obvious way to discover why open() failed.

    245         else:
    246             f.close()
    247             self.fail("no error for invalid mode: %s" % bad_mode)
    248 
    249     def testSetBufferSize(self):
    250         # make sure that explicitly setting the buffer size doesn't cause

    251         # misbehaviour especially with repeated close() calls

    252         for s in (-1, 0, 1, 512):
    253             try:
    254                 f = open(TESTFN, 'w', s)
    255                 f.write(str(s))
    256                 f.close()
    257                 f.close()
    258                 f = open(TESTFN, 'r', s)
    259                 d = int(f.read())
    260                 f.close()
    261                 f.close()
    262             except IOError, msg:
    263                 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
    264             self.assertEqual(d, s)
    265 
    266     def testTruncateOnWindows(self):
    267         os.unlink(TESTFN)
    268 
    269         def bug801631():
    270             # SF bug <http://www.python.org/sf/801631>

    271             # "file.truncate fault on windows"

    272             f = open(TESTFN, 'wb')
    273             f.write('12345678901')   # 11 bytes

    274             f.close()
    275 
    276             f = open(TESTFN,'rb+')
    277             data = f.read(5)
    278             if data != '12345':
    279                 self.fail("Read on file opened for update failed %r" % data)
    280             if f.tell() != 5:
    281                 self.fail("File pos after read wrong %d" % f.tell())
    282 
    283             f.truncate()
    284             if f.tell() != 5:
    285                 self.fail("File pos after ftruncate wrong %d" % f.tell())
    286 
    287             f.close()
    288             size = os.path.getsize(TESTFN)
    289             if size != 5:
    290                 self.fail("File size after ftruncate wrong %d" % size)
    291 
    292         try:
    293             bug801631()
    294         finally:
    295             os.unlink(TESTFN)
    296 
    297     def testIteration(self):
    298         # Test the complex interaction when mixing file-iteration and the

    299         # various read* methods. Ostensibly, the mixture could just be tested

    300         # to work when it should work according to the Python language,

    301         # instead of fail when it should fail according to the current CPython

    302         # implementation.  People don't always program Python the way they

    303         # should, though, and the implemenation might change in subtle ways,

    304         # so we explicitly test for errors, too; the test will just have to

    305         # be updated when the implementation changes.

    306         dataoffset = 16384
    307         filler = "ham\n"
    308         assert not dataoffset % len(filler), \
    309             "dataoffset must be multiple of len(filler)"
    310         nchunks = dataoffset // len(filler)
    311         testlines = [
    312             "spam, spam and eggs\n",
    313             "eggs, spam, ham and spam\n",
    314             "saussages, spam, spam and eggs\n",
    315             "spam, ham, spam and eggs\n",
    316             "spam, spam, spam, spam, spam, ham, spam\n",
    317             "wonderful spaaaaaam.\n"
    318         ]
    319         methods = [("readline", ()), ("read", ()), ("readlines", ()),
    320                    ("readinto", (array("c", " "*100),))]
    321 
    322         try:
    323             # Prepare the testfile

    324             bag = open(TESTFN, "w")
    325             bag.write(filler * nchunks)
    326             bag.writelines(testlines)
    327             bag.close()
    328             # Test for appropriate errors mixing read* and iteration

    329             for methodname, args in methods:
    330                 f = open(TESTFN)
    331                 if f.next() != filler:
    332                     self.fail, "Broken testfile"
    333                 meth = getattr(f, methodname)
    334                 try:
    335                     meth(*args)
    336                 except ValueError:
    337                     pass
    338                 else:
    339                     self.fail("%s%r after next() didn't raise ValueError" %
    340                                      (methodname, args))
    341                 f.close()
    342 
    343             # Test to see if harmless (by accident) mixing of read* and

    344             # iteration still works. This depends on the size of the internal

    345             # iteration buffer (currently 8192,) but we can test it in a

    346             # flexible manner.  Each line in the bag o' ham is 4 bytes

    347             # ("h", "a", "m", "\n"), so 4096 lines of that should get us

    348             # exactly on the buffer boundary for any power-of-2 buffersize

    349             # between 4 and 16384 (inclusive).

    350             f = open(TESTFN)
    351             for i in range(nchunks):
    352                 f.next()
    353             testline = testlines.pop(0)
    354             try:
    355                 line = f.readline()
    356             except ValueError:
    357                 self.fail("readline() after next() with supposedly empty "
    358                           "iteration-buffer failed anyway")
    359             if line != testline:
    360                 self.fail("readline() after next() with empty buffer "
    361                           "failed. Got %r, expected %r" % (line, testline))
    362             testline = testlines.pop(0)
    363             buf = array("c", "\x00" * len(testline))
    364             try:
    365                 f.readinto(buf)
    366             except ValueError:
    367                 self.fail("readinto() after next() with supposedly empty "
    368                           "iteration-buffer failed anyway")
    369             line = buf.tostring()
    370             if line != testline:
    371                 self.fail("readinto() after next() with empty buffer "
    372                           "failed. Got %r, expected %r" % (line, testline))
    373 
    374             testline = testlines.pop(0)
    375             try:
    376                 line = f.read(len(testline))
    377             except ValueError:
    378                 self.fail("read() after next() with supposedly empty "
    379                           "iteration-buffer failed anyway")
    380             if line != testline:
    381                 self.fail("read() after next() with empty buffer "
    382                           "failed. Got %r, expected %r" % (line, testline))
    383             try:
    384                 lines = f.readlines()
    385             except ValueError:
    386                 self.fail("readlines() after next() with supposedly empty "
    387                           "iteration-buffer failed anyway")
    388             if lines != testlines:
    389                 self.fail("readlines() after next() with empty buffer "
    390                           "failed. Got %r, expected %r" % (line, testline))
    391             # Reading after iteration hit EOF shouldn't hurt either

    392             f = open(TESTFN)
    393             try:
    394                 for line in f:
    395                     pass
    396                 try:
    397                     f.readline()
    398                     f.readinto(buf)
    399                     f.read()
    400                     f.readlines()
    401                 except ValueError:
    402                     self.fail("read* failed after next() consumed file")
    403             finally:
    404                 f.close()
    405         finally:
    406             os.unlink(TESTFN)
    407 
    408 class FileSubclassTests(unittest.TestCase):
    409 
    410     def testExit(self):
    411         # test that exiting with context calls subclass' close

    412         class C(file):
    413             def __init__(self, *args):
    414                 self.subclass_closed = False
    415                 file.__init__(self, *args)
    416             def close(self):
    417                 self.subclass_closed = True
    418                 file.close(self)
    419 
    420         with C(TESTFN, 'w') as f:
    421             pass
    422         self.assertTrue(f.subclass_closed)
    423 
    424 
    425 @unittest.skipUnless(threading, 'Threading required for this test.')
    426 class FileThreadingTests(unittest.TestCase):
    427     # These tests check the ability to call various methods of file objects

    428     # (including close()) concurrently without crashing the Python interpreter.

    429     # See #815646, #595601

    430 
    431     def setUp(self):
    432         self._threads = test_support.threading_setup()
    433         self.f = None
    434         self.filename = TESTFN
    435         with open(self.filename, "w") as f:
    436             f.write("\n".join("0123456789"))
    437         self._count_lock = threading.Lock()
    438         self.close_count = 0
    439         self.close_success_count = 0
    440         self.use_buffering = False
    441 
    442     def tearDown(self):
    443         if self.f:
    444             try:
    445                 self.f.close()
    446             except (EnvironmentError, ValueError):
    447                 pass
    448         try:
    449             os.remove(self.filename)
    450         except EnvironmentError:
    451             pass
    452         test_support.threading_cleanup(*self._threads)
    453 
    454     def _create_file(self):
    455         if self.use_buffering:
    456             self.f = open(self.filename, "w+", buffering=1024*16)
    457         else:
    458             self.f = open(self.filename, "w+")
    459 
    460     def _close_file(self):
    461         with self._count_lock:
    462             self.close_count += 1
    463         self.f.close()
    464         with self._count_lock:
    465             self.close_success_count += 1
    466 
    467     def _close_and_reopen_file(self):
    468         self._close_file()
    469         # if close raises an exception thats fine, self.f remains valid so

    470         # we don't need to reopen.

    471         self._create_file()
    472 
    473     def _run_workers(self, func, nb_workers, duration=0.2):
    474         with self._count_lock:
    475             self.close_count = 0
    476             self.close_success_count = 0
    477         self.do_continue = True
    478         threads = []
    479         try:
    480             for i in range(nb_workers):
    481                 t = threading.Thread(target=func)
    482                 t.start()
    483                 threads.append(t)
    484             for _ in xrange(100):
    485                 time.sleep(duration/100)
    486                 with self._count_lock:
    487                     if self.close_count-self.close_success_count > nb_workers+1:
    488                         if test_support.verbose:
    489                             print 'Q',
    490                         break
    491             time.sleep(duration)
    492         finally:
    493             self.do_continue = False
    494             for t in threads:
    495                 t.join()
    496 
    497     def _test_close_open_io(self, io_func, nb_workers=5):
    498         def worker():
    499             self._create_file()
    500             funcs = itertools.cycle((
    501                 lambda: io_func(),
    502                 lambda: self._close_and_reopen_file(),
    503             ))
    504             for f in funcs:
    505                 if not self.do_continue:
    506                     break
    507                 try:
    508                     f()
    509                 except (IOError, ValueError):
    510                     pass
    511         self._run_workers(worker, nb_workers)
    512         if test_support.verbose:
    513             # Useful verbose statistics when tuning this test to take

    514             # less time to run but still ensuring that its still useful.

    515             #

    516             # the percent of close calls that raised an error

    517             percent = 100. - 100.*self.close_success_count/self.close_count
    518             print self.close_count, ('%.4f ' % percent),
    519 
    520     def test_close_open(self):
    521         def io_func():
    522             pass
    523         self._test_close_open_io(io_func)
    524 
    525     def test_close_open_flush(self):
    526         def io_func():
    527             self.f.flush()
    528         self._test_close_open_io(io_func)
    529 
    530     def test_close_open_iter(self):
    531         def io_func():
    532             list(iter(self.f))
    533         self._test_close_open_io(io_func)
    534 
    535     def test_close_open_isatty(self):
    536         def io_func():
    537             self.f.isatty()
    538         self._test_close_open_io(io_func)
    539 
    540     def test_close_open_print(self):
    541         def io_func():
    542             print >> self.f, ''
    543         self._test_close_open_io(io_func)
    544 
    545     def test_close_open_print_buffered(self):
    546         self.use_buffering = True
    547         def io_func():
    548             print >> self.f, ''
    549         self._test_close_open_io(io_func)
    550 
    551     def test_close_open_read(self):
    552         def io_func():
    553             self.f.read(0)
    554         self._test_close_open_io(io_func)
    555 
    556     def test_close_open_readinto(self):
    557         def io_func():
    558             a = array('c', 'xxxxx')
    559             self.f.readinto(a)
    560         self._test_close_open_io(io_func)
    561 
    562     def test_close_open_readline(self):
    563         def io_func():
    564             self.f.readline()
    565         self._test_close_open_io(io_func)
    566 
    567     def test_close_open_readlines(self):
    568         def io_func():
    569             self.f.readlines()
    570         self._test_close_open_io(io_func)
    571 
    572     def test_close_open_seek(self):
    573         def io_func():
    574             self.f.seek(0, 0)
    575         self._test_close_open_io(io_func)
    576 
    577     def test_close_open_tell(self):
    578         def io_func():
    579             self.f.tell()
    580         self._test_close_open_io(io_func)
    581 
    582     def test_close_open_truncate(self):
    583         def io_func():
    584             self.f.truncate()
    585         self._test_close_open_io(io_func)
    586 
    587     def test_close_open_write(self):
    588         def io_func():
    589             self.f.write('')
    590         self._test_close_open_io(io_func)
    591 
    592     def test_close_open_writelines(self):
    593         def io_func():
    594             self.f.writelines('')
    595         self._test_close_open_io(io_func)
    596 
    597 
    598 class StdoutTests(unittest.TestCase):
    599 
    600     def test_move_stdout_on_write(self):
    601         # Issue 3242: sys.stdout can be replaced (and freed) during a

    602         # print statement; prevent a segfault in this case

    603         save_stdout = sys.stdout
    604 
    605         class File:
    606             def write(self, data):
    607                 if '\n' in data:
    608                     sys.stdout = save_stdout
    609 
    610         try:
    611             sys.stdout = File()
    612             print "some text"
    613         finally:
    614             sys.stdout = save_stdout
    615 
    616     def test_del_stdout_before_print(self):
    617         # Issue 4597: 'print' with no argument wasn't reporting when

    618         # sys.stdout was deleted.

    619         save_stdout = sys.stdout
    620         del sys.stdout
    621         try:
    622             print
    623         except RuntimeError as e:
    624             self.assertEqual(str(e), "lost sys.stdout")
    625         else:
    626             self.fail("Expected RuntimeError")
    627         finally:
    628             sys.stdout = save_stdout
    629 
    630     def test_unicode(self):
    631         import subprocess
    632 
    633         def get_message(encoding, *code):
    634             code = '\n'.join(code)
    635             env = os.environ.copy()
    636             env['PYTHONIOENCODING'] = encoding
    637             process = subprocess.Popen([sys.executable, "-c", code],
    638                                        stdout=subprocess.PIPE, env=env)
    639             stdout, stderr = process.communicate()
    640             self.assertEqual(process.returncode, 0)
    641             return stdout
    642 
    643         def check_message(text, encoding, expected):
    644             stdout = get_message(encoding,
    645                 "import sys",
    646                 "sys.stdout.write(%r)" % text,
    647                 "sys.stdout.flush()")
    648             self.assertEqual(stdout, expected)
    649 
    650         # test the encoding

    651         check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
    652         check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
    653         check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
    654 
    655         # test the error handler

    656         check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
    657         check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
    658         check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
    659 
    660         # test the buffer API

    661         for objtype in ('buffer', 'bytearray'):
    662             stdout = get_message('ascii',
    663                 'import sys',
    664                 r'sys.stdout.write(%s("\xe9"))' % objtype,
    665                 'sys.stdout.flush()')
    666             self.assertEqual(stdout, "\xe9")
    667 
    668 
    669 def test_main():
    670     # Historically, these tests have been sloppy about removing TESTFN.

    671     # So get rid of it no matter what.

    672     try:
    673         run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
    674             FileThreadingTests, StdoutTests)
    675     finally:
    676         if os.path.exists(TESTFN):
    677             os.unlink(TESTFN)
    678 
    679 if __name__ == '__main__':
    680     test_main()
    681