1 # -*- coding: iso-8859-1 -*- 2 # Copyright (C) 2001,2002 Python Software Foundation 3 # csv package unit tests 4 5 import sys 6 import os 7 import unittest 8 from StringIO import StringIO 9 import tempfile 10 import csv 11 import gc 12 import io 13 from test import test_support 14 15 class Test_Csv(unittest.TestCase): 16 """ 17 Test the underlying C csv parser in ways that are not appropriate 18 from the high level interface. Further tests of this nature are done 19 in TestDialectRegistry. 20 """ 21 def _test_arg_valid(self, ctor, arg): 22 self.assertRaises(TypeError, ctor) 23 self.assertRaises(TypeError, ctor, None) 24 self.assertRaises(TypeError, ctor, arg, bad_attr = 0) 25 self.assertRaises(TypeError, ctor, arg, delimiter = 0) 26 self.assertRaises(TypeError, ctor, arg, delimiter = 'XX') 27 self.assertRaises(csv.Error, ctor, arg, 'foo') 28 self.assertRaises(TypeError, ctor, arg, delimiter=None) 29 self.assertRaises(TypeError, ctor, arg, delimiter=1) 30 self.assertRaises(TypeError, ctor, arg, quotechar=1) 31 self.assertRaises(TypeError, ctor, arg, lineterminator=None) 32 self.assertRaises(TypeError, ctor, arg, lineterminator=1) 33 self.assertRaises(TypeError, ctor, arg, quoting=None) 34 self.assertRaises(TypeError, ctor, arg, 35 quoting=csv.QUOTE_ALL, quotechar='') 36 self.assertRaises(TypeError, ctor, arg, 37 quoting=csv.QUOTE_ALL, quotechar=None) 38 39 def test_reader_arg_valid(self): 40 self._test_arg_valid(csv.reader, []) 41 42 def test_writer_arg_valid(self): 43 self._test_arg_valid(csv.writer, StringIO()) 44 45 def _test_default_attrs(self, ctor, *args): 46 obj = ctor(*args) 47 # Check defaults 48 self.assertEqual(obj.dialect.delimiter, ',') 49 self.assertEqual(obj.dialect.doublequote, True) 50 self.assertEqual(obj.dialect.escapechar, None) 51 self.assertEqual(obj.dialect.lineterminator, "\r\n") 52 self.assertEqual(obj.dialect.quotechar, '"') 53 self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL) 54 self.assertEqual(obj.dialect.skipinitialspace, False) 55 self.assertEqual(obj.dialect.strict, False) 56 # Try deleting or changing attributes (they are read-only) 57 self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter') 58 self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':') 59 self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting') 60 self.assertRaises(AttributeError, setattr, obj.dialect, 61 'quoting', None) 62 63 def test_reader_attrs(self): 64 self._test_default_attrs(csv.reader, []) 65 66 def test_writer_attrs(self): 67 self._test_default_attrs(csv.writer, StringIO()) 68 69 def _test_kw_attrs(self, ctor, *args): 70 # Now try with alternate options 71 kwargs = dict(delimiter=':', doublequote=False, escapechar='\\', 72 lineterminator='\r', quotechar='*', 73 quoting=csv.QUOTE_NONE, skipinitialspace=True, 74 strict=True) 75 obj = ctor(*args, **kwargs) 76 self.assertEqual(obj.dialect.delimiter, ':') 77 self.assertEqual(obj.dialect.doublequote, False) 78 self.assertEqual(obj.dialect.escapechar, '\\') 79 self.assertEqual(obj.dialect.lineterminator, "\r") 80 self.assertEqual(obj.dialect.quotechar, '*') 81 self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE) 82 self.assertEqual(obj.dialect.skipinitialspace, True) 83 self.assertEqual(obj.dialect.strict, True) 84 85 def test_reader_kw_attrs(self): 86 self._test_kw_attrs(csv.reader, []) 87 88 def test_writer_kw_attrs(self): 89 self._test_kw_attrs(csv.writer, StringIO()) 90 91 def _test_dialect_attrs(self, ctor, *args): 92 # Now try with dialect-derived options 93 class dialect: 94 delimiter='-' 95 doublequote=False 96 escapechar='^' 97 lineterminator='$' 98 quotechar='#' 99 quoting=csv.QUOTE_ALL 100 skipinitialspace=True 101 strict=False 102 args = args + (dialect,) 103 obj = ctor(*args) 104 self.assertEqual(obj.dialect.delimiter, '-') 105 self.assertEqual(obj.dialect.doublequote, False) 106 self.assertEqual(obj.dialect.escapechar, '^') 107 self.assertEqual(obj.dialect.lineterminator, "$") 108 self.assertEqual(obj.dialect.quotechar, '#') 109 self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL) 110 self.assertEqual(obj.dialect.skipinitialspace, True) 111 self.assertEqual(obj.dialect.strict, False) 112 113 def test_reader_dialect_attrs(self): 114 self._test_dialect_attrs(csv.reader, []) 115 116 def test_writer_dialect_attrs(self): 117 self._test_dialect_attrs(csv.writer, StringIO()) 118 119 120 def _write_test(self, fields, expect, **kwargs): 121 fd, name = tempfile.mkstemp() 122 fileobj = os.fdopen(fd, "w+b") 123 try: 124 writer = csv.writer(fileobj, **kwargs) 125 writer.writerow(fields) 126 fileobj.seek(0) 127 self.assertEqual(fileobj.read(), 128 expect + writer.dialect.lineterminator) 129 finally: 130 fileobj.close() 131 os.unlink(name) 132 133 def test_write_arg_valid(self): 134 self.assertRaises(csv.Error, self._write_test, None, '') 135 self._write_test((), '') 136 self._write_test([None], '""') 137 self.assertRaises(csv.Error, self._write_test, 138 [None], None, quoting = csv.QUOTE_NONE) 139 # Check that exceptions are passed up the chain 140 class BadList: 141 def __len__(self): 142 return 10; 143 def __getitem__(self, i): 144 if i > 2: 145 raise IOError 146 self.assertRaises(IOError, self._write_test, BadList(), '') 147 class BadItem: 148 def __str__(self): 149 raise IOError 150 self.assertRaises(IOError, self._write_test, [BadItem()], '') 151 152 def test_write_bigfield(self): 153 # This exercises the buffer realloc functionality 154 bigstring = 'X' * 50000 155 self._write_test([bigstring,bigstring], '%s,%s' % \ 156 (bigstring, bigstring)) 157 158 def test_write_quoting(self): 159 self._write_test(['a',1,'p,q'], 'a,1,"p,q"') 160 self.assertRaises(csv.Error, 161 self._write_test, 162 ['a',1,'p,q'], 'a,1,p,q', 163 quoting = csv.QUOTE_NONE) 164 self._write_test(['a',1,'p,q'], 'a,1,"p,q"', 165 quoting = csv.QUOTE_MINIMAL) 166 self._write_test(['a',1,'p,q'], '"a",1,"p,q"', 167 quoting = csv.QUOTE_NONNUMERIC) 168 self._write_test(['a',1,'p,q'], '"a","1","p,q"', 169 quoting = csv.QUOTE_ALL) 170 self._write_test(['a\nb',1], '"a\nb","1"', 171 quoting = csv.QUOTE_ALL) 172 173 def test_write_escape(self): 174 self._write_test(['a',1,'p,q'], 'a,1,"p,q"', 175 escapechar='\\') 176 self.assertRaises(csv.Error, 177 self._write_test, 178 ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""', 179 escapechar=None, doublequote=False) 180 self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""', 181 escapechar='\\', doublequote = False) 182 self._write_test(['"'], '""""', 183 escapechar='\\', quoting = csv.QUOTE_MINIMAL) 184 self._write_test(['"'], '\\"', 185 escapechar='\\', quoting = csv.QUOTE_MINIMAL, 186 doublequote = False) 187 self._write_test(['"'], '\\"', 188 escapechar='\\', quoting = csv.QUOTE_NONE) 189 self._write_test(['a',1,'p,q'], 'a,1,p\\,q', 190 escapechar='\\', quoting = csv.QUOTE_NONE) 191 192 def test_writerows(self): 193 class BrokenFile: 194 def write(self, buf): 195 raise IOError 196 writer = csv.writer(BrokenFile()) 197 self.assertRaises(IOError, writer.writerows, [['a']]) 198 fd, name = tempfile.mkstemp() 199 fileobj = os.fdopen(fd, "w+b") 200 try: 201 writer = csv.writer(fileobj) 202 self.assertRaises(TypeError, writer.writerows, None) 203 writer.writerows([['a','b'],['c','d']]) 204 fileobj.seek(0) 205 self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n") 206 finally: 207 fileobj.close() 208 os.unlink(name) 209 210 def test_write_float(self): 211 # Issue 13573: loss of precision because csv.writer 212 # uses str() for floats instead of repr() 213 orig_row = [1.234567890123, 1.0/7.0, 'abc'] 214 f = StringIO() 215 c = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC) 216 c.writerow(orig_row) 217 f.seek(0) 218 c = csv.reader(f, quoting=csv.QUOTE_NONNUMERIC) 219 new_row = next(c) 220 self.assertEqual(orig_row, new_row) 221 222 def _read_test(self, input, expect, **kwargs): 223 reader = csv.reader(input, **kwargs) 224 result = list(reader) 225 self.assertEqual(result, expect) 226 227 def test_read_oddinputs(self): 228 self._read_test([], []) 229 self._read_test([''], [[]]) 230 self.assertRaises(csv.Error, self._read_test, 231 ['"ab"c'], None, strict = 1) 232 # cannot handle null bytes for the moment 233 self.assertRaises(csv.Error, self._read_test, 234 ['ab\0c'], None, strict = 1) 235 self._read_test(['"ab"c'], [['abc']], doublequote = 0) 236 237 def test_read_eol(self): 238 self._read_test(['a,b'], [['a','b']]) 239 self._read_test(['a,b\n'], [['a','b']]) 240 self._read_test(['a,b\r\n'], [['a','b']]) 241 self._read_test(['a,b\r'], [['a','b']]) 242 self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], []) 243 self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], []) 244 self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], []) 245 246 def test_read_eof(self): 247 self._read_test(['a,"'], [['a', '']]) 248 self._read_test(['"a'], [['a']]) 249 self._read_test(['^'], [['\n']], escapechar='^') 250 self.assertRaises(csv.Error, self._read_test, ['a,"'], [], strict=True) 251 self.assertRaises(csv.Error, self._read_test, ['"a'], [], strict=True) 252 self.assertRaises(csv.Error, self._read_test, 253 ['^'], [], escapechar='^', strict=True) 254 255 def test_read_escape(self): 256 self._read_test(['a,\\b,c'], [['a', 'b', 'c']], escapechar='\\') 257 self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\') 258 self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\') 259 self._read_test(['a,"b,\\c"'], [['a', 'b,c']], escapechar='\\') 260 self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\') 261 self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\') 262 263 def test_read_quoting(self): 264 self._read_test(['1,",3,",5'], [['1', ',3,', '5']]) 265 self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']], 266 quotechar=None, escapechar='\\') 267 self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']], 268 quoting=csv.QUOTE_NONE, escapechar='\\') 269 # will this fail where locale uses comma for decimals? 270 self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]], 271 quoting=csv.QUOTE_NONNUMERIC) 272 self._read_test(['"a\nb", 7'], [['a\nb', ' 7']]) 273 self.assertRaises(ValueError, self._read_test, 274 ['abc,3'], [[]], 275 quoting=csv.QUOTE_NONNUMERIC) 276 277 def test_read_bigfield(self): 278 # This exercises the buffer realloc functionality and field size 279 # limits. 280 limit = csv.field_size_limit() 281 try: 282 size = 50000 283 bigstring = 'X' * size 284 bigline = '%s,%s' % (bigstring, bigstring) 285 self._read_test([bigline], [[bigstring, bigstring]]) 286 csv.field_size_limit(size) 287 self._read_test([bigline], [[bigstring, bigstring]]) 288 self.assertEqual(csv.field_size_limit(), size) 289 csv.field_size_limit(size-1) 290 self.assertRaises(csv.Error, self._read_test, [bigline], []) 291 self.assertRaises(TypeError, csv.field_size_limit, None) 292 self.assertRaises(TypeError, csv.field_size_limit, 1, None) 293 finally: 294 csv.field_size_limit(limit) 295 296 def test_read_linenum(self): 297 for r in (csv.reader(['line,1', 'line,2', 'line,3']), 298 csv.DictReader(['line,1', 'line,2', 'line,3'], 299 fieldnames=['a', 'b', 'c'])): 300 self.assertEqual(r.line_num, 0) 301 r.next() 302 self.assertEqual(r.line_num, 1) 303 r.next() 304 self.assertEqual(r.line_num, 2) 305 r.next() 306 self.assertEqual(r.line_num, 3) 307 self.assertRaises(StopIteration, r.next) 308 self.assertEqual(r.line_num, 3) 309 310 def test_roundtrip_quoteed_newlines(self): 311 fd, name = tempfile.mkstemp() 312 fileobj = os.fdopen(fd, "w+b") 313 try: 314 writer = csv.writer(fileobj) 315 self.assertRaises(TypeError, writer.writerows, None) 316 rows = [['a\nb','b'],['c','x\r\nd']] 317 writer.writerows(rows) 318 fileobj.seek(0) 319 for i, row in enumerate(csv.reader(fileobj)): 320 self.assertEqual(row, rows[i]) 321 finally: 322 fileobj.close() 323 os.unlink(name) 324 325 class TestDialectRegistry(unittest.TestCase): 326 def test_registry_badargs(self): 327 self.assertRaises(TypeError, csv.list_dialects, None) 328 self.assertRaises(TypeError, csv.get_dialect) 329 self.assertRaises(csv.Error, csv.get_dialect, None) 330 self.assertRaises(csv.Error, csv.get_dialect, "nonesuch") 331 self.assertRaises(TypeError, csv.unregister_dialect) 332 self.assertRaises(csv.Error, csv.unregister_dialect, None) 333 self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch") 334 self.assertRaises(TypeError, csv.register_dialect, None) 335 self.assertRaises(TypeError, csv.register_dialect, None, None) 336 self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0) 337 self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 338 badargument=None) 339 self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 340 quoting=None) 341 self.assertRaises(TypeError, csv.register_dialect, []) 342 343 def test_registry(self): 344 class myexceltsv(csv.excel): 345 delimiter = "\t" 346 name = "myexceltsv" 347 expected_dialects = csv.list_dialects() + [name] 348 expected_dialects.sort() 349 csv.register_dialect(name, myexceltsv) 350 self.addCleanup(csv.unregister_dialect, name) 351 self.assertEqual(csv.get_dialect(name).delimiter, '\t') 352 got_dialects = sorted(csv.list_dialects()) 353 self.assertEqual(expected_dialects, got_dialects) 354 355 def test_register_kwargs(self): 356 name = 'fedcba' 357 csv.register_dialect(name, delimiter=';') 358 self.addCleanup(csv.unregister_dialect, name) 359 self.assertEqual(csv.get_dialect(name).delimiter, ';') 360 self.assertEqual([['X', 'Y', 'Z']], list(csv.reader(['X;Y;Z'], name))) 361 362 def test_incomplete_dialect(self): 363 class myexceltsv(csv.Dialect): 364 delimiter = "\t" 365 self.assertRaises(csv.Error, myexceltsv) 366 367 def test_space_dialect(self): 368 class space(csv.excel): 369 delimiter = " " 370 quoting = csv.QUOTE_NONE 371 escapechar = "\\" 372 373 fd, name = tempfile.mkstemp() 374 fileobj = os.fdopen(fd, "w+b") 375 try: 376 fileobj.write("abc def\nc1ccccc1 benzene\n") 377 fileobj.seek(0) 378 rdr = csv.reader(fileobj, dialect=space()) 379 self.assertEqual(rdr.next(), ["abc", "def"]) 380 self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"]) 381 finally: 382 fileobj.close() 383 os.unlink(name) 384 385 def test_dialect_apply(self): 386 class testA(csv.excel): 387 delimiter = "\t" 388 class testB(csv.excel): 389 delimiter = ":" 390 class testC(csv.excel): 391 delimiter = "|" 392 393 csv.register_dialect('testC', testC) 394 try: 395 fd, name = tempfile.mkstemp() 396 fileobj = os.fdopen(fd, "w+b") 397 try: 398 writer = csv.writer(fileobj) 399 writer.writerow([1,2,3]) 400 fileobj.seek(0) 401 self.assertEqual(fileobj.read(), "1,2,3\r\n") 402 finally: 403 fileobj.close() 404 os.unlink(name) 405 406 fd, name = tempfile.mkstemp() 407 fileobj = os.fdopen(fd, "w+b") 408 try: 409 writer = csv.writer(fileobj, testA) 410 writer.writerow([1,2,3]) 411 fileobj.seek(0) 412 self.assertEqual(fileobj.read(), "1\t2\t3\r\n") 413 finally: 414 fileobj.close() 415 os.unlink(name) 416 417 fd, name = tempfile.mkstemp() 418 fileobj = os.fdopen(fd, "w+b") 419 try: 420 writer = csv.writer(fileobj, dialect=testB()) 421 writer.writerow([1,2,3]) 422 fileobj.seek(0) 423 self.assertEqual(fileobj.read(), "1:2:3\r\n") 424 finally: 425 fileobj.close() 426 os.unlink(name) 427 428 fd, name = tempfile.mkstemp() 429 fileobj = os.fdopen(fd, "w+b") 430 try: 431 writer = csv.writer(fileobj, dialect='testC') 432 writer.writerow([1,2,3]) 433 fileobj.seek(0) 434 self.assertEqual(fileobj.read(), "1|2|3\r\n") 435 finally: 436 fileobj.close() 437 os.unlink(name) 438 439 fd, name = tempfile.mkstemp() 440 fileobj = os.fdopen(fd, "w+b") 441 try: 442 writer = csv.writer(fileobj, dialect=testA, delimiter=';') 443 writer.writerow([1,2,3]) 444 fileobj.seek(0) 445 self.assertEqual(fileobj.read(), "1;2;3\r\n") 446 finally: 447 fileobj.close() 448 os.unlink(name) 449 450 finally: 451 csv.unregister_dialect('testC') 452 453 def test_bad_dialect(self): 454 # Unknown parameter 455 self.assertRaises(TypeError, csv.reader, [], bad_attr = 0) 456 # Bad values 457 self.assertRaises(TypeError, csv.reader, [], delimiter = None) 458 self.assertRaises(TypeError, csv.reader, [], quoting = -1) 459 self.assertRaises(TypeError, csv.reader, [], quoting = 100) 460 461 class TestCsvBase(unittest.TestCase): 462 def readerAssertEqual(self, input, expected_result): 463 fd, name = tempfile.mkstemp() 464 fileobj = os.fdopen(fd, "w+b") 465 try: 466 fileobj.write(input) 467 fileobj.seek(0) 468 reader = csv.reader(fileobj, dialect = self.dialect) 469 fields = list(reader) 470 self.assertEqual(fields, expected_result) 471 finally: 472 fileobj.close() 473 os.unlink(name) 474 475 def writerAssertEqual(self, input, expected_result): 476 fd, name = tempfile.mkstemp() 477 fileobj = os.fdopen(fd, "w+b") 478 try: 479 writer = csv.writer(fileobj, dialect = self.dialect) 480 writer.writerows(input) 481 fileobj.seek(0) 482 self.assertEqual(fileobj.read(), expected_result) 483 finally: 484 fileobj.close() 485 os.unlink(name) 486 487 class TestDialectExcel(TestCsvBase): 488 dialect = 'excel' 489 490 def test_single(self): 491 self.readerAssertEqual('abc', [['abc']]) 492 493 def test_simple(self): 494 self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']]) 495 496 def test_blankline(self): 497 self.readerAssertEqual('', []) 498 499 def test_empty_fields(self): 500 self.readerAssertEqual(',', [['', '']]) 501 502 def test_singlequoted(self): 503 self.readerAssertEqual('""', [['']]) 504 505 def test_singlequoted_left_empty(self): 506 self.readerAssertEqual('"",', [['','']]) 507 508 def test_singlequoted_right_empty(self): 509 self.readerAssertEqual(',""', [['','']]) 510 511 def test_single_quoted_quote(self): 512 self.readerAssertEqual('""""', [['"']]) 513 514 def test_quoted_quotes(self): 515 self.readerAssertEqual('""""""', [['""']]) 516 517 def test_inline_quote(self): 518 self.readerAssertEqual('a""b', [['a""b']]) 519 520 def test_inline_quotes(self): 521 self.readerAssertEqual('a"b"c', [['a"b"c']]) 522 523 def test_quotes_and_more(self): 524 # Excel would never write a field containing '"a"b', but when 525 # reading one, it will return 'ab'. 526 self.readerAssertEqual('"a"b', [['ab']]) 527 528 def test_lone_quote(self): 529 self.readerAssertEqual('a"b', [['a"b']]) 530 531 def test_quote_and_quote(self): 532 # Excel would never write a field containing '"a" "b"', but when 533 # reading one, it will return 'a "b"'. 534 self.readerAssertEqual('"a" "b"', [['a "b"']]) 535 536 def test_space_and_quote(self): 537 self.readerAssertEqual(' "a"', [[' "a"']]) 538 539 def test_quoted(self): 540 self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6', 541 [['1', '2', '3', 542 'I think, therefore I am', 543 '5', '6']]) 544 545 def test_quoted_quote(self): 546 self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"', 547 [['1', '2', '3', 548 '"I see," said the blind man', 549 'as he picked up his hammer and saw']]) 550 551 def test_quoted_nl(self): 552 input = '''\ 553 1,2,3,"""I see,"" 554 said the blind man","as he picked up his 555 hammer and saw" 556 9,8,7,6''' 557 self.readerAssertEqual(input, 558 [['1', '2', '3', 559 '"I see,"\nsaid the blind man', 560 'as he picked up his\nhammer and saw'], 561 ['9','8','7','6']]) 562 563 def test_dubious_quote(self): 564 self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']]) 565 566 def test_null(self): 567 self.writerAssertEqual([], '') 568 569 def test_single_writer(self): 570 self.writerAssertEqual([['abc']], 'abc\r\n') 571 572 def test_simple_writer(self): 573 self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n') 574 575 def test_quotes(self): 576 self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n') 577 578 def test_quote_fieldsep(self): 579 self.writerAssertEqual([['abc,def']], '"abc,def"\r\n') 580 581 def test_newlines(self): 582 self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n') 583 584 class EscapedExcel(csv.excel): 585 quoting = csv.QUOTE_NONE 586 escapechar = '\\' 587 588 class TestEscapedExcel(TestCsvBase): 589 dialect = EscapedExcel() 590 591 def test_escape_fieldsep(self): 592 self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n') 593 594 def test_read_escape_fieldsep(self): 595 self.readerAssertEqual('abc\\,def\r\n', [['abc,def']]) 596 597 class QuotedEscapedExcel(csv.excel): 598 quoting = csv.QUOTE_NONNUMERIC 599 escapechar = '\\' 600 601 class TestQuotedEscapedExcel(TestCsvBase): 602 dialect = QuotedEscapedExcel() 603 604 def test_write_escape_fieldsep(self): 605 self.writerAssertEqual([['abc,def']], '"abc,def"\r\n') 606 607 def test_read_escape_fieldsep(self): 608 self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']]) 609 610 class TestDictFields(unittest.TestCase): 611 ### "long" means the row is longer than the number of fieldnames 612 ### "short" means there are fewer elements in the row than fieldnames 613 def test_write_simple_dict(self): 614 fd, name = tempfile.mkstemp() 615 fileobj = io.open(fd, 'w+b') 616 try: 617 writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"]) 618 writer.writeheader() 619 fileobj.seek(0) 620 self.assertEqual(fileobj.readline(), "f1,f2,f3\r\n") 621 writer.writerow({"f1": 10, "f3": "abc"}) 622 fileobj.seek(0) 623 fileobj.readline() # header 624 self.assertEqual(fileobj.read(), "10,,abc\r\n") 625 finally: 626 fileobj.close() 627 os.unlink(name) 628 629 def test_write_no_fields(self): 630 fileobj = StringIO() 631 self.assertRaises(TypeError, csv.DictWriter, fileobj) 632 633 def test_read_dict_fields(self): 634 fd, name = tempfile.mkstemp() 635 fileobj = os.fdopen(fd, "w+b") 636 try: 637 fileobj.write("1,2,abc\r\n") 638 fileobj.seek(0) 639 reader = csv.DictReader(fileobj, 640 fieldnames=["f1", "f2", "f3"]) 641 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'}) 642 finally: 643 fileobj.close() 644 os.unlink(name) 645 646 def test_read_dict_no_fieldnames(self): 647 fd, name = tempfile.mkstemp() 648 fileobj = os.fdopen(fd, "w+b") 649 try: 650 fileobj.write("f1,f2,f3\r\n1,2,abc\r\n") 651 fileobj.seek(0) 652 reader = csv.DictReader(fileobj) 653 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"]) 654 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'}) 655 finally: 656 fileobj.close() 657 os.unlink(name) 658 659 # Two test cases to make sure existing ways of implicitly setting 660 # fieldnames continue to work. Both arise from discussion in issue3436. 661 def test_read_dict_fieldnames_from_file(self): 662 fd, name = tempfile.mkstemp() 663 f = os.fdopen(fd, "w+b") 664 try: 665 f.write("f1,f2,f3\r\n1,2,abc\r\n") 666 f.seek(0) 667 reader = csv.DictReader(f, fieldnames=csv.reader(f).next()) 668 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"]) 669 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'}) 670 finally: 671 f.close() 672 os.unlink(name) 673 674 def test_read_dict_fieldnames_chain(self): 675 import itertools 676 fd, name = tempfile.mkstemp() 677 f = os.fdopen(fd, "w+b") 678 try: 679 f.write("f1,f2,f3\r\n1,2,abc\r\n") 680 f.seek(0) 681 reader = csv.DictReader(f) 682 first = next(reader) 683 for row in itertools.chain([first], reader): 684 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"]) 685 self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'}) 686 finally: 687 f.close() 688 os.unlink(name) 689 690 def test_read_long(self): 691 fd, name = tempfile.mkstemp() 692 fileobj = os.fdopen(fd, "w+b") 693 try: 694 fileobj.write("1,2,abc,4,5,6\r\n") 695 fileobj.seek(0) 696 reader = csv.DictReader(fileobj, 697 fieldnames=["f1", "f2"]) 698 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', 699 None: ["abc", "4", "5", "6"]}) 700 finally: 701 fileobj.close() 702 os.unlink(name) 703 704 def test_read_long_with_rest(self): 705 fd, name = tempfile.mkstemp() 706 fileobj = os.fdopen(fd, "w+b") 707 try: 708 fileobj.write("1,2,abc,4,5,6\r\n") 709 fileobj.seek(0) 710 reader = csv.DictReader(fileobj, 711 fieldnames=["f1", "f2"], restkey="_rest") 712 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', 713 "_rest": ["abc", "4", "5", "6"]}) 714 finally: 715 fileobj.close() 716 os.unlink(name) 717 718 def test_read_long_with_rest_no_fieldnames(self): 719 fd, name = tempfile.mkstemp() 720 fileobj = os.fdopen(fd, "w+b") 721 try: 722 fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n") 723 fileobj.seek(0) 724 reader = csv.DictReader(fileobj, restkey="_rest") 725 self.assertEqual(reader.fieldnames, ["f1", "f2"]) 726 self.assertEqual(reader.next(), {"f1": '1', "f2": '2', 727 "_rest": ["abc", "4", "5", "6"]}) 728 finally: 729 fileobj.close() 730 os.unlink(name) 731 732 def test_read_short(self): 733 fd, name = tempfile.mkstemp() 734 fileobj = os.fdopen(fd, "w+b") 735 try: 736 fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n") 737 fileobj.seek(0) 738 reader = csv.DictReader(fileobj, 739 fieldnames="1 2 3 4 5 6".split(), 740 restval="DEFAULT") 741 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 742 "4": '4', "5": '5', "6": '6'}) 743 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 744 "4": 'DEFAULT', "5": 'DEFAULT', 745 "6": 'DEFAULT'}) 746 finally: 747 fileobj.close() 748 os.unlink(name) 749 750 def test_read_multi(self): 751 sample = [ 752 '2147483648,43.0e12,17,abc,def\r\n', 753 '147483648,43.0e2,17,abc,def\r\n', 754 '47483648,43.0,170,abc,def\r\n' 755 ] 756 757 reader = csv.DictReader(sample, 758 fieldnames="i1 float i2 s1 s2".split()) 759 self.assertEqual(reader.next(), {"i1": '2147483648', 760 "float": '43.0e12', 761 "i2": '17', 762 "s1": 'abc', 763 "s2": 'def'}) 764 765 def test_read_with_blanks(self): 766 reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n", 767 "1,2,abc,4,5,6\r\n"], 768 fieldnames="1 2 3 4 5 6".split()) 769 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 770 "4": '4', "5": '5', "6": '6'}) 771 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 772 "4": '4', "5": '5', "6": '6'}) 773 774 def test_read_semi_sep(self): 775 reader = csv.DictReader(["1;2;abc;4;5;6\r\n"], 776 fieldnames="1 2 3 4 5 6".split(), 777 delimiter=';') 778 self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc', 779 "4": '4', "5": '5', "6": '6'}) 780 781 class TestArrayWrites(unittest.TestCase): 782 def test_int_write(self): 783 import array 784 contents = [(20-i) for i in range(20)] 785 a = array.array('i', contents) 786 787 fd, name = tempfile.mkstemp() 788 fileobj = os.fdopen(fd, "w+b") 789 try: 790 writer = csv.writer(fileobj, dialect="excel") 791 writer.writerow(a) 792 expected = ",".join([str(i) for i in a])+"\r\n" 793 fileobj.seek(0) 794 self.assertEqual(fileobj.read(), expected) 795 finally: 796 fileobj.close() 797 os.unlink(name) 798 799 def test_double_write(self): 800 import array 801 contents = [(20-i)*0.1 for i in range(20)] 802 a = array.array('d', contents) 803 fd, name = tempfile.mkstemp() 804 fileobj = os.fdopen(fd, "w+b") 805 try: 806 writer = csv.writer(fileobj, dialect="excel") 807 writer.writerow(a) 808 expected = ",".join([repr(i) for i in a])+"\r\n" 809 fileobj.seek(0) 810 self.assertEqual(fileobj.read(), expected) 811 finally: 812 fileobj.close() 813 os.unlink(name) 814 815 def test_float_write(self): 816 import array 817 contents = [(20-i)*0.1 for i in range(20)] 818 a = array.array('f', contents) 819 fd, name = tempfile.mkstemp() 820 fileobj = os.fdopen(fd, "w+b") 821 try: 822 writer = csv.writer(fileobj, dialect="excel") 823 writer.writerow(a) 824 expected = ",".join([repr(i) for i in a])+"\r\n" 825 fileobj.seek(0) 826 self.assertEqual(fileobj.read(), expected) 827 finally: 828 fileobj.close() 829 os.unlink(name) 830 831 def test_char_write(self): 832 import array, string 833 a = array.array('c', string.letters) 834 fd, name = tempfile.mkstemp() 835 fileobj = os.fdopen(fd, "w+b") 836 try: 837 writer = csv.writer(fileobj, dialect="excel") 838 writer.writerow(a) 839 expected = ",".join(a)+"\r\n" 840 fileobj.seek(0) 841 self.assertEqual(fileobj.read(), expected) 842 finally: 843 fileobj.close() 844 os.unlink(name) 845 846 class TestDialectValidity(unittest.TestCase): 847 def test_quoting(self): 848 class mydialect(csv.Dialect): 849 delimiter = ";" 850 escapechar = '\\' 851 doublequote = False 852 skipinitialspace = True 853 lineterminator = '\r\n' 854 quoting = csv.QUOTE_NONE 855 d = mydialect() 856 857 mydialect.quoting = None 858 self.assertRaises(csv.Error, mydialect) 859 860 mydialect.doublequote = True 861 mydialect.quoting = csv.QUOTE_ALL 862 mydialect.quotechar = '"' 863 d = mydialect() 864 865 mydialect.quotechar = "''" 866 self.assertRaises(csv.Error, mydialect) 867 868 mydialect.quotechar = 4 869 self.assertRaises(csv.Error, mydialect) 870 871 def test_delimiter(self): 872 class mydialect(csv.Dialect): 873 delimiter = ";" 874 escapechar = '\\' 875 doublequote = False 876 skipinitialspace = True 877 lineterminator = '\r\n' 878 quoting = csv.QUOTE_NONE 879 d = mydialect() 880 881 mydialect.delimiter = ":::" 882 self.assertRaises(csv.Error, mydialect) 883 884 mydialect.delimiter = 4 885 self.assertRaises(csv.Error, mydialect) 886 887 def test_lineterminator(self): 888 class mydialect(csv.Dialect): 889 delimiter = ";" 890 escapechar = '\\' 891 doublequote = False 892 skipinitialspace = True 893 lineterminator = '\r\n' 894 quoting = csv.QUOTE_NONE 895 d = mydialect() 896 897 mydialect.lineterminator = ":::" 898 d = mydialect() 899 900 mydialect.lineterminator = 4 901 self.assertRaises(csv.Error, mydialect) 902 903 904 class TestSniffer(unittest.TestCase): 905 sample1 = """\ 906 Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes 907 Shark City, Glendale Heights, IL, 12/28/02, Prezence 908 Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow 909 Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back 910 """ 911 sample2 = """\ 912 'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes' 913 'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence' 914 'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow' 915 'Stonecutters ''Seafood'' and Chop House':'Lemont':'IL':'12/19/02':'Week Back' 916 """ 917 header = '''\ 918 "venue","city","state","date","performers" 919 ''' 920 sample3 = '''\ 921 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03 922 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03 923 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03 924 ''' 925 926 sample4 = '''\ 927 2147483648;43.0e12;17;abc;def 928 147483648;43.0e2;17;abc;def 929 47483648;43.0;170;abc;def 930 ''' 931 932 sample5 = "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n" 933 sample6 = "a|b|c\r\nd|e|f\r\n" 934 sample7 = "'a'|'b'|'c'\r\n'd'|e|f\r\n" 935 936 def test_has_header(self): 937 sniffer = csv.Sniffer() 938 self.assertEqual(sniffer.has_header(self.sample1), False) 939 self.assertEqual(sniffer.has_header(self.header+self.sample1), True) 940 941 def test_sniff(self): 942 sniffer = csv.Sniffer() 943 dialect = sniffer.sniff(self.sample1) 944 self.assertEqual(dialect.delimiter, ",") 945 self.assertEqual(dialect.quotechar, '"') 946 self.assertEqual(dialect.skipinitialspace, True) 947 948 dialect = sniffer.sniff(self.sample2) 949 self.assertEqual(dialect.delimiter, ":") 950 self.assertEqual(dialect.quotechar, "'") 951 self.assertEqual(dialect.skipinitialspace, False) 952 953 def test_delimiters(self): 954 sniffer = csv.Sniffer() 955 dialect = sniffer.sniff(self.sample3) 956 # given that all three lines in sample3 are equal, 957 # I think that any character could have been 'guessed' as the 958 # delimiter, depending on dictionary order 959 self.assertIn(dialect.delimiter, self.sample3) 960 dialect = sniffer.sniff(self.sample3, delimiters="?,") 961 self.assertEqual(dialect.delimiter, "?") 962 dialect = sniffer.sniff(self.sample3, delimiters="/,") 963 self.assertEqual(dialect.delimiter, "/") 964 dialect = sniffer.sniff(self.sample4) 965 self.assertEqual(dialect.delimiter, ";") 966 dialect = sniffer.sniff(self.sample5) 967 self.assertEqual(dialect.delimiter, "\t") 968 dialect = sniffer.sniff(self.sample6) 969 self.assertEqual(dialect.delimiter, "|") 970 dialect = sniffer.sniff(self.sample7) 971 self.assertEqual(dialect.delimiter, "|") 972 self.assertEqual(dialect.quotechar, "'") 973 974 def test_doublequote(self): 975 sniffer = csv.Sniffer() 976 dialect = sniffer.sniff(self.header) 977 self.assertFalse(dialect.doublequote) 978 dialect = sniffer.sniff(self.sample2) 979 self.assertTrue(dialect.doublequote) 980 981 if not hasattr(sys, "gettotalrefcount"): 982 if test_support.verbose: print "*** skipping leakage tests ***" 983 else: 984 class NUL: 985 def write(s, *args): 986 pass 987 writelines = write 988 989 class TestLeaks(unittest.TestCase): 990 def test_create_read(self): 991 delta = 0 992 lastrc = sys.gettotalrefcount() 993 for i in xrange(20): 994 gc.collect() 995 self.assertEqual(gc.garbage, []) 996 rc = sys.gettotalrefcount() 997 csv.reader(["a,b,c\r\n"]) 998 csv.reader(["a,b,c\r\n"]) 999 csv.reader(["a,b,c\r\n"]) 1000 delta = rc-lastrc 1001 lastrc = rc 1002 # if csv.reader() leaks, last delta should be 3 or more 1003 self.assertEqual(delta < 3, True) 1004 1005 def test_create_write(self): 1006 delta = 0 1007 lastrc = sys.gettotalrefcount() 1008 s = NUL() 1009 for i in xrange(20): 1010 gc.collect() 1011 self.assertEqual(gc.garbage, []) 1012 rc = sys.gettotalrefcount() 1013 csv.writer(s) 1014 csv.writer(s) 1015 csv.writer(s) 1016 delta = rc-lastrc 1017 lastrc = rc 1018 # if csv.writer() leaks, last delta should be 3 or more 1019 self.assertEqual(delta < 3, True) 1020 1021 def test_read(self): 1022 delta = 0 1023 rows = ["a,b,c\r\n"]*5 1024 lastrc = sys.gettotalrefcount() 1025 for i in xrange(20): 1026 gc.collect() 1027 self.assertEqual(gc.garbage, []) 1028 rc = sys.gettotalrefcount() 1029 rdr = csv.reader(rows) 1030 for row in rdr: 1031 pass 1032 delta = rc-lastrc 1033 lastrc = rc 1034 # if reader leaks during read, delta should be 5 or more 1035 self.assertEqual(delta < 5, True) 1036 1037 def test_write(self): 1038 delta = 0 1039 rows = [[1,2,3]]*5 1040 s = NUL() 1041 lastrc = sys.gettotalrefcount() 1042 for i in xrange(20): 1043 gc.collect() 1044 self.assertEqual(gc.garbage, []) 1045 rc = sys.gettotalrefcount() 1046 writer = csv.writer(s) 1047 for row in rows: 1048 writer.writerow(row) 1049 delta = rc-lastrc 1050 lastrc = rc 1051 # if writer leaks during write, last delta should be 5 or more 1052 self.assertEqual(delta < 5, True) 1053 1054 # commented out for now - csv module doesn't yet support Unicode 1055 ## class TestUnicode(unittest.TestCase): 1056 ## def test_unicode_read(self): 1057 ## import codecs 1058 ## f = codecs.EncodedFile(StringIO("Martin von Lwis," 1059 ## "Marc Andr Lemburg," 1060 ## "Guido van Rossum," 1061 ## "Franois Pinard\r\n"), 1062 ## data_encoding='iso-8859-1') 1063 ## reader = csv.reader(f) 1064 ## self.assertEqual(list(reader), [[u"Martin von Lwis", 1065 ## u"Marc Andr Lemburg", 1066 ## u"Guido van Rossum", 1067 ## u"Franois Pinardn"]]) 1068 1069 def test_main(): 1070 mod = sys.modules[__name__] 1071 test_support.run_unittest( 1072 *[getattr(mod, name) for name in dir(mod) if name.startswith('Test')] 1073 ) 1074 1075 if __name__ == '__main__': 1076 test_main() 1077