Home | History | Annotate | Download | only in test
      1 # -*- coding: iso-8859-1 -*-
      2 # Copyright (C) 2001,2002 Python Software Foundation
      3 # csv package unit tests
      4 
      5 import sys
      6 import os
      7 import unittest
      8 from StringIO import StringIO
      9 import tempfile
     10 import csv
     11 import gc
     12 import io
     13 from test import test_support
     14 
     15 class Test_Csv(unittest.TestCase):
     16     """
     17     Test the underlying C csv parser in ways that are not appropriate
     18     from the high level interface. Further tests of this nature are done
     19     in TestDialectRegistry.
     20     """
     21     def _test_arg_valid(self, ctor, arg):
     22         self.assertRaises(TypeError, ctor)
     23         self.assertRaises(TypeError, ctor, None)
     24         self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
     25         self.assertRaises(TypeError, ctor, arg, delimiter = 0)
     26         self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
     27         self.assertRaises(csv.Error, ctor, arg, 'foo')
     28         self.assertRaises(TypeError, ctor, arg, delimiter=None)
     29         self.assertRaises(TypeError, ctor, arg, delimiter=1)
     30         self.assertRaises(TypeError, ctor, arg, quotechar=1)
     31         self.assertRaises(TypeError, ctor, arg, lineterminator=None)
     32         self.assertRaises(TypeError, ctor, arg, lineterminator=1)
     33         self.assertRaises(TypeError, ctor, arg, quoting=None)
     34         self.assertRaises(TypeError, ctor, arg,
     35                           quoting=csv.QUOTE_ALL, quotechar='')
     36         self.assertRaises(TypeError, ctor, arg,
     37                           quoting=csv.QUOTE_ALL, quotechar=None)
     38 
     39     def test_reader_arg_valid(self):
     40         self._test_arg_valid(csv.reader, [])
     41 
     42     def test_writer_arg_valid(self):
     43         self._test_arg_valid(csv.writer, StringIO())
     44 
     45     def _test_default_attrs(self, ctor, *args):
     46         obj = ctor(*args)
     47         # Check defaults
     48         self.assertEqual(obj.dialect.delimiter, ',')
     49         self.assertEqual(obj.dialect.doublequote, True)
     50         self.assertEqual(obj.dialect.escapechar, None)
     51         self.assertEqual(obj.dialect.lineterminator, "\r\n")
     52         self.assertEqual(obj.dialect.quotechar, '"')
     53         self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
     54         self.assertEqual(obj.dialect.skipinitialspace, False)
     55         self.assertEqual(obj.dialect.strict, False)
     56         # Try deleting or changing attributes (they are read-only)
     57         self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter')
     58         self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':')
     59         self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
     60         self.assertRaises(AttributeError, setattr, obj.dialect,
     61                           'quoting', None)
     62 
     63     def test_reader_attrs(self):
     64         self._test_default_attrs(csv.reader, [])
     65 
     66     def test_writer_attrs(self):
     67         self._test_default_attrs(csv.writer, StringIO())
     68 
     69     def _test_kw_attrs(self, ctor, *args):
     70         # Now try with alternate options
     71         kwargs = dict(delimiter=':', doublequote=False, escapechar='\\',
     72                       lineterminator='\r', quotechar='*',
     73                       quoting=csv.QUOTE_NONE, skipinitialspace=True,
     74                       strict=True)
     75         obj = ctor(*args, **kwargs)
     76         self.assertEqual(obj.dialect.delimiter, ':')
     77         self.assertEqual(obj.dialect.doublequote, False)
     78         self.assertEqual(obj.dialect.escapechar, '\\')
     79         self.assertEqual(obj.dialect.lineterminator, "\r")
     80         self.assertEqual(obj.dialect.quotechar, '*')
     81         self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE)
     82         self.assertEqual(obj.dialect.skipinitialspace, True)
     83         self.assertEqual(obj.dialect.strict, True)
     84 
     85     def test_reader_kw_attrs(self):
     86         self._test_kw_attrs(csv.reader, [])
     87 
     88     def test_writer_kw_attrs(self):
     89         self._test_kw_attrs(csv.writer, StringIO())
     90 
     91     def _test_dialect_attrs(self, ctor, *args):
     92         # Now try with dialect-derived options
     93         class dialect:
     94             delimiter='-'
     95             doublequote=False
     96             escapechar='^'
     97             lineterminator='$'
     98             quotechar='#'
     99             quoting=csv.QUOTE_ALL
    100             skipinitialspace=True
    101             strict=False
    102         args = args + (dialect,)
    103         obj = ctor(*args)
    104         self.assertEqual(obj.dialect.delimiter, '-')
    105         self.assertEqual(obj.dialect.doublequote, False)
    106         self.assertEqual(obj.dialect.escapechar, '^')
    107         self.assertEqual(obj.dialect.lineterminator, "$")
    108         self.assertEqual(obj.dialect.quotechar, '#')
    109         self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
    110         self.assertEqual(obj.dialect.skipinitialspace, True)
    111         self.assertEqual(obj.dialect.strict, False)
    112 
    113     def test_reader_dialect_attrs(self):
    114         self._test_dialect_attrs(csv.reader, [])
    115 
    116     def test_writer_dialect_attrs(self):
    117         self._test_dialect_attrs(csv.writer, StringIO())
    118 
    119 
    120     def _write_test(self, fields, expect, **kwargs):
    121         fd, name = tempfile.mkstemp()
    122         fileobj = os.fdopen(fd, "w+b")
    123         try:
    124             writer = csv.writer(fileobj, **kwargs)
    125             writer.writerow(fields)
    126             fileobj.seek(0)
    127             self.assertEqual(fileobj.read(),
    128                              expect + writer.dialect.lineterminator)
    129         finally:
    130             fileobj.close()
    131             os.unlink(name)
    132 
    133     def test_write_arg_valid(self):
    134         self.assertRaises(csv.Error, self._write_test, None, '')
    135         self._write_test((), '')
    136         self._write_test([None], '""')
    137         self.assertRaises(csv.Error, self._write_test,
    138                           [None], None, quoting = csv.QUOTE_NONE)
    139         # Check that exceptions are passed up the chain
    140         class BadList:
    141             def __len__(self):
    142                 return 10;
    143             def __getitem__(self, i):
    144                 if i > 2:
    145                     raise IOError
    146         self.assertRaises(IOError, self._write_test, BadList(), '')
    147         class BadItem:
    148             def __str__(self):
    149                 raise IOError
    150         self.assertRaises(IOError, self._write_test, [BadItem()], '')
    151 
    152     def test_write_bigfield(self):
    153         # This exercises the buffer realloc functionality
    154         bigstring = 'X' * 50000
    155         self._write_test([bigstring,bigstring], '%s,%s' % \
    156                          (bigstring, bigstring))
    157 
    158     def test_write_quoting(self):
    159         self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
    160         self.assertRaises(csv.Error,
    161                           self._write_test,
    162                           ['a',1,'p,q'], 'a,1,p,q',
    163                           quoting = csv.QUOTE_NONE)
    164         self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
    165                          quoting = csv.QUOTE_MINIMAL)
    166         self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
    167                          quoting = csv.QUOTE_NONNUMERIC)
    168         self._write_test(['a',1,'p,q'], '"a","1","p,q"',
    169                          quoting = csv.QUOTE_ALL)
    170         self._write_test(['a\nb',1], '"a\nb","1"',
    171                          quoting = csv.QUOTE_ALL)
    172 
    173     def test_write_escape(self):
    174         self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
    175                          escapechar='\\')
    176         self.assertRaises(csv.Error,
    177                           self._write_test,
    178                           ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
    179                           escapechar=None, doublequote=False)
    180         self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
    181                          escapechar='\\', doublequote = False)
    182         self._write_test(['"'], '""""',
    183                          escapechar='\\', quoting = csv.QUOTE_MINIMAL)
    184         self._write_test(['"'], '\\"',
    185                          escapechar='\\', quoting = csv.QUOTE_MINIMAL,
    186                          doublequote = False)
    187         self._write_test(['"'], '\\"',
    188                          escapechar='\\', quoting = csv.QUOTE_NONE)
    189         self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
    190                          escapechar='\\', quoting = csv.QUOTE_NONE)
    191 
    192     def test_writerows(self):
    193         class BrokenFile:
    194             def write(self, buf):
    195                 raise IOError
    196         writer = csv.writer(BrokenFile())
    197         self.assertRaises(IOError, writer.writerows, [['a']])
    198         fd, name = tempfile.mkstemp()
    199         fileobj = os.fdopen(fd, "w+b")
    200         try:
    201             writer = csv.writer(fileobj)
    202             self.assertRaises(TypeError, writer.writerows, None)
    203             writer.writerows([['a','b'],['c','d']])
    204             fileobj.seek(0)
    205             self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
    206         finally:
    207             fileobj.close()
    208             os.unlink(name)
    209 
    210     def test_write_float(self):
    211         # Issue 13573: loss of precision because csv.writer
    212         # uses str() for floats instead of repr()
    213         orig_row = [1.234567890123, 1.0/7.0, 'abc']
    214         f = StringIO()
    215         c = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)
    216         c.writerow(orig_row)
    217         f.seek(0)
    218         c = csv.reader(f, quoting=csv.QUOTE_NONNUMERIC)
    219         new_row = next(c)
    220         self.assertEqual(orig_row, new_row)
    221 
    222     def _read_test(self, input, expect, **kwargs):
    223         reader = csv.reader(input, **kwargs)
    224         result = list(reader)
    225         self.assertEqual(result, expect)
    226 
    227     def test_read_oddinputs(self):
    228         self._read_test([], [])
    229         self._read_test([''], [[]])
    230         self.assertRaises(csv.Error, self._read_test,
    231                           ['"ab"c'], None, strict = 1)
    232         # cannot handle null bytes for the moment
    233         self.assertRaises(csv.Error, self._read_test,
    234                           ['ab\0c'], None, strict = 1)
    235         self._read_test(['"ab"c'], [['abc']], doublequote = 0)
    236 
    237     def test_read_eol(self):
    238         self._read_test(['a,b'], [['a','b']])
    239         self._read_test(['a,b\n'], [['a','b']])
    240         self._read_test(['a,b\r\n'], [['a','b']])
    241         self._read_test(['a,b\r'], [['a','b']])
    242         self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], [])
    243         self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], [])
    244         self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], [])
    245 
    246     def test_read_eof(self):
    247         self._read_test(['a,"'], [['a', '']])
    248         self._read_test(['"a'], [['a']])
    249         self._read_test(['^'], [['\n']], escapechar='^')
    250         self.assertRaises(csv.Error, self._read_test, ['a,"'], [], strict=True)
    251         self.assertRaises(csv.Error, self._read_test, ['"a'], [], strict=True)
    252         self.assertRaises(csv.Error, self._read_test,
    253                           ['^'], [], escapechar='^', strict=True)
    254 
    255     def test_read_escape(self):
    256         self._read_test(['a,\\b,c'], [['a', 'b', 'c']], escapechar='\\')
    257         self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\')
    258         self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')
    259         self._read_test(['a,"b,\\c"'], [['a', 'b,c']], escapechar='\\')
    260         self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')
    261         self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')
    262 
    263     def test_read_quoting(self):
    264         self._read_test(['1,",3,",5'], [['1', ',3,', '5']])
    265         self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
    266                         quotechar=None, escapechar='\\')
    267         self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
    268                         quoting=csv.QUOTE_NONE, escapechar='\\')
    269         # will this fail where locale uses comma for decimals?
    270         self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],
    271                         quoting=csv.QUOTE_NONNUMERIC)
    272         self._read_test(['"a\nb", 7'], [['a\nb', ' 7']])
    273         self.assertRaises(ValueError, self._read_test,
    274                           ['abc,3'], [[]],
    275                           quoting=csv.QUOTE_NONNUMERIC)
    276 
    277     def test_read_bigfield(self):
    278         # This exercises the buffer realloc functionality and field size
    279         # limits.
    280         limit = csv.field_size_limit()
    281         try:
    282             size = 50000
    283             bigstring = 'X' * size
    284             bigline = '%s,%s' % (bigstring, bigstring)
    285             self._read_test([bigline], [[bigstring, bigstring]])
    286             csv.field_size_limit(size)
    287             self._read_test([bigline], [[bigstring, bigstring]])
    288             self.assertEqual(csv.field_size_limit(), size)
    289             csv.field_size_limit(size-1)
    290             self.assertRaises(csv.Error, self._read_test, [bigline], [])
    291             self.assertRaises(TypeError, csv.field_size_limit, None)
    292             self.assertRaises(TypeError, csv.field_size_limit, 1, None)
    293         finally:
    294             csv.field_size_limit(limit)
    295 
    296     def test_read_linenum(self):
    297         for r in (csv.reader(['line,1', 'line,2', 'line,3']),
    298                   csv.DictReader(['line,1', 'line,2', 'line,3'],
    299                                  fieldnames=['a', 'b', 'c'])):
    300             self.assertEqual(r.line_num, 0)
    301             r.next()
    302             self.assertEqual(r.line_num, 1)
    303             r.next()
    304             self.assertEqual(r.line_num, 2)
    305             r.next()
    306             self.assertEqual(r.line_num, 3)
    307             self.assertRaises(StopIteration, r.next)
    308             self.assertEqual(r.line_num, 3)
    309 
    310     def test_roundtrip_quoteed_newlines(self):
    311         fd, name = tempfile.mkstemp()
    312         fileobj = os.fdopen(fd, "w+b")
    313         try:
    314             writer = csv.writer(fileobj)
    315             self.assertRaises(TypeError, writer.writerows, None)
    316             rows = [['a\nb','b'],['c','x\r\nd']]
    317             writer.writerows(rows)
    318             fileobj.seek(0)
    319             for i, row in enumerate(csv.reader(fileobj)):
    320                 self.assertEqual(row, rows[i])
    321         finally:
    322             fileobj.close()
    323             os.unlink(name)
    324 
    325 class TestDialectRegistry(unittest.TestCase):
    326     def test_registry_badargs(self):
    327         self.assertRaises(TypeError, csv.list_dialects, None)
    328         self.assertRaises(TypeError, csv.get_dialect)
    329         self.assertRaises(csv.Error, csv.get_dialect, None)
    330         self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")
    331         self.assertRaises(TypeError, csv.unregister_dialect)
    332         self.assertRaises(csv.Error, csv.unregister_dialect, None)
    333         self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")
    334         self.assertRaises(TypeError, csv.register_dialect, None)
    335         self.assertRaises(TypeError, csv.register_dialect, None, None)
    336         self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0)
    337         self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
    338                           badargument=None)
    339         self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
    340                           quoting=None)
    341         self.assertRaises(TypeError, csv.register_dialect, [])
    342 
    343     def test_registry(self):
    344         class myexceltsv(csv.excel):
    345             delimiter = "\t"
    346         name = "myexceltsv"
    347         expected_dialects = csv.list_dialects() + [name]
    348         expected_dialects.sort()
    349         csv.register_dialect(name, myexceltsv)
    350         self.addCleanup(csv.unregister_dialect, name)
    351         self.assertEqual(csv.get_dialect(name).delimiter, '\t')
    352         got_dialects = sorted(csv.list_dialects())
    353         self.assertEqual(expected_dialects, got_dialects)
    354 
    355     def test_register_kwargs(self):
    356         name = 'fedcba'
    357         csv.register_dialect(name, delimiter=';')
    358         self.addCleanup(csv.unregister_dialect, name)
    359         self.assertEqual(csv.get_dialect(name).delimiter, ';')
    360         self.assertEqual([['X', 'Y', 'Z']], list(csv.reader(['X;Y;Z'], name)))
    361 
    362     def test_incomplete_dialect(self):
    363         class myexceltsv(csv.Dialect):
    364             delimiter = "\t"
    365         self.assertRaises(csv.Error, myexceltsv)
    366 
    367     def test_space_dialect(self):
    368         class space(csv.excel):
    369             delimiter = " "
    370             quoting = csv.QUOTE_NONE
    371             escapechar = "\\"
    372 
    373         fd, name = tempfile.mkstemp()
    374         fileobj = os.fdopen(fd, "w+b")
    375         try:
    376             fileobj.write("abc def\nc1ccccc1 benzene\n")
    377             fileobj.seek(0)
    378             rdr = csv.reader(fileobj, dialect=space())
    379             self.assertEqual(rdr.next(), ["abc", "def"])
    380             self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
    381         finally:
    382             fileobj.close()
    383             os.unlink(name)
    384 
    385     def test_dialect_apply(self):
    386         class testA(csv.excel):
    387             delimiter = "\t"
    388         class testB(csv.excel):
    389             delimiter = ":"
    390         class testC(csv.excel):
    391             delimiter = "|"
    392 
    393         csv.register_dialect('testC', testC)
    394         try:
    395             fd, name = tempfile.mkstemp()
    396             fileobj = os.fdopen(fd, "w+b")
    397             try:
    398                 writer = csv.writer(fileobj)
    399                 writer.writerow([1,2,3])
    400                 fileobj.seek(0)
    401                 self.assertEqual(fileobj.read(), "1,2,3\r\n")
    402             finally:
    403                 fileobj.close()
    404                 os.unlink(name)
    405 
    406             fd, name = tempfile.mkstemp()
    407             fileobj = os.fdopen(fd, "w+b")
    408             try:
    409                 writer = csv.writer(fileobj, testA)
    410                 writer.writerow([1,2,3])
    411                 fileobj.seek(0)
    412                 self.assertEqual(fileobj.read(), "1\t2\t3\r\n")
    413             finally:
    414                 fileobj.close()
    415                 os.unlink(name)
    416 
    417             fd, name = tempfile.mkstemp()
    418             fileobj = os.fdopen(fd, "w+b")
    419             try:
    420                 writer = csv.writer(fileobj, dialect=testB())
    421                 writer.writerow([1,2,3])
    422                 fileobj.seek(0)
    423                 self.assertEqual(fileobj.read(), "1:2:3\r\n")
    424             finally:
    425                 fileobj.close()
    426                 os.unlink(name)
    427 
    428             fd, name = tempfile.mkstemp()
    429             fileobj = os.fdopen(fd, "w+b")
    430             try:
    431                 writer = csv.writer(fileobj, dialect='testC')
    432                 writer.writerow([1,2,3])
    433                 fileobj.seek(0)
    434                 self.assertEqual(fileobj.read(), "1|2|3\r\n")
    435             finally:
    436                 fileobj.close()
    437                 os.unlink(name)
    438 
    439             fd, name = tempfile.mkstemp()
    440             fileobj = os.fdopen(fd, "w+b")
    441             try:
    442                 writer = csv.writer(fileobj, dialect=testA, delimiter=';')
    443                 writer.writerow([1,2,3])
    444                 fileobj.seek(0)
    445                 self.assertEqual(fileobj.read(), "1;2;3\r\n")
    446             finally:
    447                 fileobj.close()
    448                 os.unlink(name)
    449 
    450         finally:
    451             csv.unregister_dialect('testC')
    452 
    453     def test_bad_dialect(self):
    454         # Unknown parameter
    455         self.assertRaises(TypeError, csv.reader, [], bad_attr = 0)
    456         # Bad values
    457         self.assertRaises(TypeError, csv.reader, [], delimiter = None)
    458         self.assertRaises(TypeError, csv.reader, [], quoting = -1)
    459         self.assertRaises(TypeError, csv.reader, [], quoting = 100)
    460 
    461 class TestCsvBase(unittest.TestCase):
    462     def readerAssertEqual(self, input, expected_result):
    463         fd, name = tempfile.mkstemp()
    464         fileobj = os.fdopen(fd, "w+b")
    465         try:
    466             fileobj.write(input)
    467             fileobj.seek(0)
    468             reader = csv.reader(fileobj, dialect = self.dialect)
    469             fields = list(reader)
    470             self.assertEqual(fields, expected_result)
    471         finally:
    472             fileobj.close()
    473             os.unlink(name)
    474 
    475     def writerAssertEqual(self, input, expected_result):
    476         fd, name = tempfile.mkstemp()
    477         fileobj = os.fdopen(fd, "w+b")
    478         try:
    479             writer = csv.writer(fileobj, dialect = self.dialect)
    480             writer.writerows(input)
    481             fileobj.seek(0)
    482             self.assertEqual(fileobj.read(), expected_result)
    483         finally:
    484             fileobj.close()
    485             os.unlink(name)
    486 
    487 class TestDialectExcel(TestCsvBase):
    488     dialect = 'excel'
    489 
    490     def test_single(self):
    491         self.readerAssertEqual('abc', [['abc']])
    492 
    493     def test_simple(self):
    494         self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']])
    495 
    496     def test_blankline(self):
    497         self.readerAssertEqual('', [])
    498 
    499     def test_empty_fields(self):
    500         self.readerAssertEqual(',', [['', '']])
    501 
    502     def test_singlequoted(self):
    503         self.readerAssertEqual('""', [['']])
    504 
    505     def test_singlequoted_left_empty(self):
    506         self.readerAssertEqual('"",', [['','']])
    507 
    508     def test_singlequoted_right_empty(self):
    509         self.readerAssertEqual(',""', [['','']])
    510 
    511     def test_single_quoted_quote(self):
    512         self.readerAssertEqual('""""', [['"']])
    513 
    514     def test_quoted_quotes(self):
    515         self.readerAssertEqual('""""""', [['""']])
    516 
    517     def test_inline_quote(self):
    518         self.readerAssertEqual('a""b', [['a""b']])
    519 
    520     def test_inline_quotes(self):
    521         self.readerAssertEqual('a"b"c', [['a"b"c']])
    522 
    523     def test_quotes_and_more(self):
    524         # Excel would never write a field containing '"a"b', but when
    525         # reading one, it will return 'ab'.
    526         self.readerAssertEqual('"a"b', [['ab']])
    527 
    528     def test_lone_quote(self):
    529         self.readerAssertEqual('a"b', [['a"b']])
    530 
    531     def test_quote_and_quote(self):
    532         # Excel would never write a field containing '"a" "b"', but when
    533         # reading one, it will return 'a "b"'.
    534         self.readerAssertEqual('"a" "b"', [['a "b"']])
    535 
    536     def test_space_and_quote(self):
    537         self.readerAssertEqual(' "a"', [[' "a"']])
    538 
    539     def test_quoted(self):
    540         self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6',
    541                                [['1', '2', '3',
    542                                  'I think, therefore I am',
    543                                  '5', '6']])
    544 
    545     def test_quoted_quote(self):
    546         self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"',
    547                                [['1', '2', '3',
    548                                  '"I see," said the blind man',
    549                                  'as he picked up his hammer and saw']])
    550 
    551     def test_quoted_nl(self):
    552         input = '''\
    553 1,2,3,"""I see,""
    554 said the blind man","as he picked up his
    555 hammer and saw"
    556 9,8,7,6'''
    557         self.readerAssertEqual(input,
    558                                [['1', '2', '3',
    559                                    '"I see,"\nsaid the blind man',
    560                                    'as he picked up his\nhammer and saw'],
    561                                 ['9','8','7','6']])
    562 
    563     def test_dubious_quote(self):
    564         self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']])
    565 
    566     def test_null(self):
    567         self.writerAssertEqual([], '')
    568 
    569     def test_single_writer(self):
    570         self.writerAssertEqual([['abc']], 'abc\r\n')
    571 
    572     def test_simple_writer(self):
    573         self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n')
    574 
    575     def test_quotes(self):
    576         self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n')
    577 
    578     def test_quote_fieldsep(self):
    579         self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
    580 
    581     def test_newlines(self):
    582         self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n')
    583 
    584 class EscapedExcel(csv.excel):
    585     quoting = csv.QUOTE_NONE
    586     escapechar = '\\'
    587 
    588 class TestEscapedExcel(TestCsvBase):
    589     dialect = EscapedExcel()
    590 
    591     def test_escape_fieldsep(self):
    592         self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n')
    593 
    594     def test_read_escape_fieldsep(self):
    595         self.readerAssertEqual('abc\\,def\r\n', [['abc,def']])
    596 
    597 class QuotedEscapedExcel(csv.excel):
    598     quoting = csv.QUOTE_NONNUMERIC
    599     escapechar = '\\'
    600 
    601 class TestQuotedEscapedExcel(TestCsvBase):
    602     dialect = QuotedEscapedExcel()
    603 
    604     def test_write_escape_fieldsep(self):
    605         self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
    606 
    607     def test_read_escape_fieldsep(self):
    608         self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']])
    609 
    610 class TestDictFields(unittest.TestCase):
    611     ### "long" means the row is longer than the number of fieldnames
    612     ### "short" means there are fewer elements in the row than fieldnames
    613     def test_write_simple_dict(self):
    614         fd, name = tempfile.mkstemp()
    615         fileobj = io.open(fd, 'w+b')
    616         try:
    617             writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
    618             writer.writeheader()
    619             fileobj.seek(0)
    620             self.assertEqual(fileobj.readline(), "f1,f2,f3\r\n")
    621             writer.writerow({"f1": 10, "f3": "abc"})
    622             fileobj.seek(0)
    623             fileobj.readline() # header
    624             self.assertEqual(fileobj.read(), "10,,abc\r\n")
    625         finally:
    626             fileobj.close()
    627             os.unlink(name)
    628 
    629     def test_write_no_fields(self):
    630         fileobj = StringIO()
    631         self.assertRaises(TypeError, csv.DictWriter, fileobj)
    632 
    633     def test_read_dict_fields(self):
    634         fd, name = tempfile.mkstemp()
    635         fileobj = os.fdopen(fd, "w+b")
    636         try:
    637             fileobj.write("1,2,abc\r\n")
    638             fileobj.seek(0)
    639             reader = csv.DictReader(fileobj,
    640                                     fieldnames=["f1", "f2", "f3"])
    641             self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
    642         finally:
    643             fileobj.close()
    644             os.unlink(name)
    645 
    646     def test_read_dict_no_fieldnames(self):
    647         fd, name = tempfile.mkstemp()
    648         fileobj = os.fdopen(fd, "w+b")
    649         try:
    650             fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
    651             fileobj.seek(0)
    652             reader = csv.DictReader(fileobj)
    653             self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
    654             self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
    655         finally:
    656             fileobj.close()
    657             os.unlink(name)
    658 
    659     # Two test cases to make sure existing ways of implicitly setting
    660     # fieldnames continue to work.  Both arise from discussion in issue3436.
    661     def test_read_dict_fieldnames_from_file(self):
    662         fd, name = tempfile.mkstemp()
    663         f = os.fdopen(fd, "w+b")
    664         try:
    665             f.write("f1,f2,f3\r\n1,2,abc\r\n")
    666             f.seek(0)
    667             reader = csv.DictReader(f, fieldnames=csv.reader(f).next())
    668             self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
    669             self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
    670         finally:
    671             f.close()
    672             os.unlink(name)
    673 
    674     def test_read_dict_fieldnames_chain(self):
    675         import itertools
    676         fd, name = tempfile.mkstemp()
    677         f = os.fdopen(fd, "w+b")
    678         try:
    679             f.write("f1,f2,f3\r\n1,2,abc\r\n")
    680             f.seek(0)
    681             reader = csv.DictReader(f)
    682             first = next(reader)
    683             for row in itertools.chain([first], reader):
    684                 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
    685                 self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'})
    686         finally:
    687             f.close()
    688             os.unlink(name)
    689 
    690     def test_read_long(self):
    691         fd, name = tempfile.mkstemp()
    692         fileobj = os.fdopen(fd, "w+b")
    693         try:
    694             fileobj.write("1,2,abc,4,5,6\r\n")
    695             fileobj.seek(0)
    696             reader = csv.DictReader(fileobj,
    697                                     fieldnames=["f1", "f2"])
    698             self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
    699                                              None: ["abc", "4", "5", "6"]})
    700         finally:
    701             fileobj.close()
    702             os.unlink(name)
    703 
    704     def test_read_long_with_rest(self):
    705         fd, name = tempfile.mkstemp()
    706         fileobj = os.fdopen(fd, "w+b")
    707         try:
    708             fileobj.write("1,2,abc,4,5,6\r\n")
    709             fileobj.seek(0)
    710             reader = csv.DictReader(fileobj,
    711                                     fieldnames=["f1", "f2"], restkey="_rest")
    712             self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
    713                                              "_rest": ["abc", "4", "5", "6"]})
    714         finally:
    715             fileobj.close()
    716             os.unlink(name)
    717 
    718     def test_read_long_with_rest_no_fieldnames(self):
    719         fd, name = tempfile.mkstemp()
    720         fileobj = os.fdopen(fd, "w+b")
    721         try:
    722             fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
    723             fileobj.seek(0)
    724             reader = csv.DictReader(fileobj, restkey="_rest")
    725             self.assertEqual(reader.fieldnames, ["f1", "f2"])
    726             self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
    727                                              "_rest": ["abc", "4", "5", "6"]})
    728         finally:
    729             fileobj.close()
    730             os.unlink(name)
    731 
    732     def test_read_short(self):
    733         fd, name = tempfile.mkstemp()
    734         fileobj = os.fdopen(fd, "w+b")
    735         try:
    736             fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
    737             fileobj.seek(0)
    738             reader = csv.DictReader(fileobj,
    739                                     fieldnames="1 2 3 4 5 6".split(),
    740                                     restval="DEFAULT")
    741             self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
    742                                              "4": '4', "5": '5', "6": '6'})
    743             self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
    744                                              "4": 'DEFAULT', "5": 'DEFAULT',
    745                                              "6": 'DEFAULT'})
    746         finally:
    747             fileobj.close()
    748             os.unlink(name)
    749 
    750     def test_read_multi(self):
    751         sample = [
    752             '2147483648,43.0e12,17,abc,def\r\n',
    753             '147483648,43.0e2,17,abc,def\r\n',
    754             '47483648,43.0,170,abc,def\r\n'
    755             ]
    756 
    757         reader = csv.DictReader(sample,
    758                                 fieldnames="i1 float i2 s1 s2".split())
    759         self.assertEqual(reader.next(), {"i1": '2147483648',
    760                                          "float": '43.0e12',
    761                                          "i2": '17',
    762                                          "s1": 'abc',
    763                                          "s2": 'def'})
    764 
    765     def test_read_with_blanks(self):
    766         reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n",
    767                                  "1,2,abc,4,5,6\r\n"],
    768                                 fieldnames="1 2 3 4 5 6".split())
    769         self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
    770                                          "4": '4', "5": '5', "6": '6'})
    771         self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
    772                                          "4": '4', "5": '5', "6": '6'})
    773 
    774     def test_read_semi_sep(self):
    775         reader = csv.DictReader(["1;2;abc;4;5;6\r\n"],
    776                                 fieldnames="1 2 3 4 5 6".split(),
    777                                 delimiter=';')
    778         self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
    779                                          "4": '4', "5": '5', "6": '6'})
    780 
    781 class TestArrayWrites(unittest.TestCase):
    782     def test_int_write(self):
    783         import array
    784         contents = [(20-i) for i in range(20)]
    785         a = array.array('i', contents)
    786 
    787         fd, name = tempfile.mkstemp()
    788         fileobj = os.fdopen(fd, "w+b")
    789         try:
    790             writer = csv.writer(fileobj, dialect="excel")
    791             writer.writerow(a)
    792             expected = ",".join([str(i) for i in a])+"\r\n"
    793             fileobj.seek(0)
    794             self.assertEqual(fileobj.read(), expected)
    795         finally:
    796             fileobj.close()
    797             os.unlink(name)
    798 
    799     def test_double_write(self):
    800         import array
    801         contents = [(20-i)*0.1 for i in range(20)]
    802         a = array.array('d', contents)
    803         fd, name = tempfile.mkstemp()
    804         fileobj = os.fdopen(fd, "w+b")
    805         try:
    806             writer = csv.writer(fileobj, dialect="excel")
    807             writer.writerow(a)
    808             expected = ",".join([repr(i) for i in a])+"\r\n"
    809             fileobj.seek(0)
    810             self.assertEqual(fileobj.read(), expected)
    811         finally:
    812             fileobj.close()
    813             os.unlink(name)
    814 
    815     def test_float_write(self):
    816         import array
    817         contents = [(20-i)*0.1 for i in range(20)]
    818         a = array.array('f', contents)
    819         fd, name = tempfile.mkstemp()
    820         fileobj = os.fdopen(fd, "w+b")
    821         try:
    822             writer = csv.writer(fileobj, dialect="excel")
    823             writer.writerow(a)
    824             expected = ",".join([repr(i) for i in a])+"\r\n"
    825             fileobj.seek(0)
    826             self.assertEqual(fileobj.read(), expected)
    827         finally:
    828             fileobj.close()
    829             os.unlink(name)
    830 
    831     def test_char_write(self):
    832         import array, string
    833         a = array.array('c', string.letters)
    834         fd, name = tempfile.mkstemp()
    835         fileobj = os.fdopen(fd, "w+b")
    836         try:
    837             writer = csv.writer(fileobj, dialect="excel")
    838             writer.writerow(a)
    839             expected = ",".join(a)+"\r\n"
    840             fileobj.seek(0)
    841             self.assertEqual(fileobj.read(), expected)
    842         finally:
    843             fileobj.close()
    844             os.unlink(name)
    845 
    846 class TestDialectValidity(unittest.TestCase):
    847     def test_quoting(self):
    848         class mydialect(csv.Dialect):
    849             delimiter = ";"
    850             escapechar = '\\'
    851             doublequote = False
    852             skipinitialspace = True
    853             lineterminator = '\r\n'
    854             quoting = csv.QUOTE_NONE
    855         d = mydialect()
    856 
    857         mydialect.quoting = None
    858         self.assertRaises(csv.Error, mydialect)
    859 
    860         mydialect.doublequote = True
    861         mydialect.quoting = csv.QUOTE_ALL
    862         mydialect.quotechar = '"'
    863         d = mydialect()
    864 
    865         mydialect.quotechar = "''"
    866         self.assertRaises(csv.Error, mydialect)
    867 
    868         mydialect.quotechar = 4
    869         self.assertRaises(csv.Error, mydialect)
    870 
    871     def test_delimiter(self):
    872         class mydialect(csv.Dialect):
    873             delimiter = ";"
    874             escapechar = '\\'
    875             doublequote = False
    876             skipinitialspace = True
    877             lineterminator = '\r\n'
    878             quoting = csv.QUOTE_NONE
    879         d = mydialect()
    880 
    881         mydialect.delimiter = ":::"
    882         self.assertRaises(csv.Error, mydialect)
    883 
    884         mydialect.delimiter = 4
    885         self.assertRaises(csv.Error, mydialect)
    886 
    887     def test_lineterminator(self):
    888         class mydialect(csv.Dialect):
    889             delimiter = ";"
    890             escapechar = '\\'
    891             doublequote = False
    892             skipinitialspace = True
    893             lineterminator = '\r\n'
    894             quoting = csv.QUOTE_NONE
    895         d = mydialect()
    896 
    897         mydialect.lineterminator = ":::"
    898         d = mydialect()
    899 
    900         mydialect.lineterminator = 4
    901         self.assertRaises(csv.Error, mydialect)
    902 
    903 
    904 class TestSniffer(unittest.TestCase):
    905     sample1 = """\
    906 Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes
    907 Shark City, Glendale Heights, IL, 12/28/02, Prezence
    908 Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow
    909 Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back
    910 """
    911     sample2 = """\
    912 'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes'
    913 'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence'
    914 'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow'
    915 'Stonecutters ''Seafood'' and Chop House':'Lemont':'IL':'12/19/02':'Week Back'
    916 """
    917     header = '''\
    918 "venue","city","state","date","performers"
    919 '''
    920     sample3 = '''\
    921 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
    922 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
    923 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
    924 '''
    925 
    926     sample4 = '''\
    927 2147483648;43.0e12;17;abc;def
    928 147483648;43.0e2;17;abc;def
    929 47483648;43.0;170;abc;def
    930 '''
    931 
    932     sample5 = "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n"
    933     sample6 = "a|b|c\r\nd|e|f\r\n"
    934     sample7 = "'a'|'b'|'c'\r\n'd'|e|f\r\n"
    935 
    936     def test_has_header(self):
    937         sniffer = csv.Sniffer()
    938         self.assertEqual(sniffer.has_header(self.sample1), False)
    939         self.assertEqual(sniffer.has_header(self.header+self.sample1), True)
    940 
    941     def test_sniff(self):
    942         sniffer = csv.Sniffer()
    943         dialect = sniffer.sniff(self.sample1)
    944         self.assertEqual(dialect.delimiter, ",")
    945         self.assertEqual(dialect.quotechar, '"')
    946         self.assertEqual(dialect.skipinitialspace, True)
    947 
    948         dialect = sniffer.sniff(self.sample2)
    949         self.assertEqual(dialect.delimiter, ":")
    950         self.assertEqual(dialect.quotechar, "'")
    951         self.assertEqual(dialect.skipinitialspace, False)
    952 
    953     def test_delimiters(self):
    954         sniffer = csv.Sniffer()
    955         dialect = sniffer.sniff(self.sample3)
    956         # given that all three lines in sample3 are equal,
    957         # I think that any character could have been 'guessed' as the
    958         # delimiter, depending on dictionary order
    959         self.assertIn(dialect.delimiter, self.sample3)
    960         dialect = sniffer.sniff(self.sample3, delimiters="?,")
    961         self.assertEqual(dialect.delimiter, "?")
    962         dialect = sniffer.sniff(self.sample3, delimiters="/,")
    963         self.assertEqual(dialect.delimiter, "/")
    964         dialect = sniffer.sniff(self.sample4)
    965         self.assertEqual(dialect.delimiter, ";")
    966         dialect = sniffer.sniff(self.sample5)
    967         self.assertEqual(dialect.delimiter, "\t")
    968         dialect = sniffer.sniff(self.sample6)
    969         self.assertEqual(dialect.delimiter, "|")
    970         dialect = sniffer.sniff(self.sample7)
    971         self.assertEqual(dialect.delimiter, "|")
    972         self.assertEqual(dialect.quotechar, "'")
    973 
    974     def test_doublequote(self):
    975         sniffer = csv.Sniffer()
    976         dialect = sniffer.sniff(self.header)
    977         self.assertFalse(dialect.doublequote)
    978         dialect = sniffer.sniff(self.sample2)
    979         self.assertTrue(dialect.doublequote)
    980 
    981 if not hasattr(sys, "gettotalrefcount"):
    982     if test_support.verbose: print "*** skipping leakage tests ***"
    983 else:
    984     class NUL:
    985         def write(s, *args):
    986             pass
    987         writelines = write
    988 
    989     class TestLeaks(unittest.TestCase):
    990         def test_create_read(self):
    991             delta = 0
    992             lastrc = sys.gettotalrefcount()
    993             for i in xrange(20):
    994                 gc.collect()
    995                 self.assertEqual(gc.garbage, [])
    996                 rc = sys.gettotalrefcount()
    997                 csv.reader(["a,b,c\r\n"])
    998                 csv.reader(["a,b,c\r\n"])
    999                 csv.reader(["a,b,c\r\n"])
   1000                 delta = rc-lastrc
   1001                 lastrc = rc
   1002             # if csv.reader() leaks, last delta should be 3 or more
   1003             self.assertEqual(delta < 3, True)
   1004 
   1005         def test_create_write(self):
   1006             delta = 0
   1007             lastrc = sys.gettotalrefcount()
   1008             s = NUL()
   1009             for i in xrange(20):
   1010                 gc.collect()
   1011                 self.assertEqual(gc.garbage, [])
   1012                 rc = sys.gettotalrefcount()
   1013                 csv.writer(s)
   1014                 csv.writer(s)
   1015                 csv.writer(s)
   1016                 delta = rc-lastrc
   1017                 lastrc = rc
   1018             # if csv.writer() leaks, last delta should be 3 or more
   1019             self.assertEqual(delta < 3, True)
   1020 
   1021         def test_read(self):
   1022             delta = 0
   1023             rows = ["a,b,c\r\n"]*5
   1024             lastrc = sys.gettotalrefcount()
   1025             for i in xrange(20):
   1026                 gc.collect()
   1027                 self.assertEqual(gc.garbage, [])
   1028                 rc = sys.gettotalrefcount()
   1029                 rdr = csv.reader(rows)
   1030                 for row in rdr:
   1031                     pass
   1032                 delta = rc-lastrc
   1033                 lastrc = rc
   1034             # if reader leaks during read, delta should be 5 or more
   1035             self.assertEqual(delta < 5, True)
   1036 
   1037         def test_write(self):
   1038             delta = 0
   1039             rows = [[1,2,3]]*5
   1040             s = NUL()
   1041             lastrc = sys.gettotalrefcount()
   1042             for i in xrange(20):
   1043                 gc.collect()
   1044                 self.assertEqual(gc.garbage, [])
   1045                 rc = sys.gettotalrefcount()
   1046                 writer = csv.writer(s)
   1047                 for row in rows:
   1048                     writer.writerow(row)
   1049                 delta = rc-lastrc
   1050                 lastrc = rc
   1051             # if writer leaks during write, last delta should be 5 or more
   1052             self.assertEqual(delta < 5, True)
   1053 
   1054 # commented out for now - csv module doesn't yet support Unicode
   1055 ## class TestUnicode(unittest.TestCase):
   1056 ##     def test_unicode_read(self):
   1057 ##         import codecs
   1058 ##         f = codecs.EncodedFile(StringIO("Martin von Lwis,"
   1059 ##                                         "Marc Andr Lemburg,"
   1060 ##                                         "Guido van Rossum,"
   1061 ##                                         "Franois Pinard\r\n"),
   1062 ##                                data_encoding='iso-8859-1')
   1063 ##         reader = csv.reader(f)
   1064 ##         self.assertEqual(list(reader), [[u"Martin von Lwis",
   1065 ##                                          u"Marc Andr Lemburg",
   1066 ##                                          u"Guido van Rossum",
   1067 ##                                          u"Franois Pinardn"]])
   1068 
   1069 def test_main():
   1070     mod = sys.modules[__name__]
   1071     test_support.run_unittest(
   1072         *[getattr(mod, name) for name in dir(mod) if name.startswith('Test')]
   1073     )
   1074 
   1075 if __name__ == '__main__':
   1076     test_main()
   1077