1 #-*- coding: ISO-8859-1 -*- 2 # pysqlite2/test/dbapi.py: tests for DB-API compliance 3 # 4 # Copyright (C) 2004-2010 Gerhard Hring <gh (at] ghaering.de> 5 # 6 # This file is part of pysqlite. 7 # 8 # This software is provided 'as-is', without any express or implied 9 # warranty. In no event will the authors be held liable for any damages 10 # arising from the use of this software. 11 # 12 # Permission is granted to anyone to use this software for any purpose, 13 # including commercial applications, and to alter it and redistribute it 14 # freely, subject to the following restrictions: 15 # 16 # 1. The origin of this software must not be misrepresented; you must not 17 # claim that you wrote the original software. If you use this software 18 # in a product, an acknowledgment in the product documentation would be 19 # appreciated but is not required. 20 # 2. Altered source versions must be plainly marked as such, and must not be 21 # misrepresented as being the original software. 22 # 3. This notice may not be removed or altered from any source distribution. 23 24 import unittest 25 import sys 26 import sqlite3 as sqlite 27 try: 28 import threading 29 except ImportError: 30 threading = None 31 32 class ModuleTests(unittest.TestCase): 33 def CheckAPILevel(self): 34 self.assertEqual(sqlite.apilevel, "2.0", 35 "apilevel is %s, should be 2.0" % sqlite.apilevel) 36 37 def CheckThreadSafety(self): 38 self.assertEqual(sqlite.threadsafety, 1, 39 "threadsafety is %d, should be 1" % sqlite.threadsafety) 40 41 def CheckParamStyle(self): 42 self.assertEqual(sqlite.paramstyle, "qmark", 43 "paramstyle is '%s', should be 'qmark'" % 44 sqlite.paramstyle) 45 46 def CheckWarning(self): 47 self.assertTrue(issubclass(sqlite.Warning, StandardError), 48 "Warning is not a subclass of StandardError") 49 50 def CheckError(self): 51 self.assertTrue(issubclass(sqlite.Error, StandardError), 52 "Error is not a subclass of StandardError") 53 54 def CheckInterfaceError(self): 55 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), 56 "InterfaceError is not a subclass of Error") 57 58 def CheckDatabaseError(self): 59 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), 60 "DatabaseError is not a subclass of Error") 61 62 def CheckDataError(self): 63 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), 64 "DataError is not a subclass of DatabaseError") 65 66 def CheckOperationalError(self): 67 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), 68 "OperationalError is not a subclass of DatabaseError") 69 70 def CheckIntegrityError(self): 71 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), 72 "IntegrityError is not a subclass of DatabaseError") 73 74 def CheckInternalError(self): 75 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), 76 "InternalError is not a subclass of DatabaseError") 77 78 def CheckProgrammingError(self): 79 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), 80 "ProgrammingError is not a subclass of DatabaseError") 81 82 def CheckNotSupportedError(self): 83 self.assertTrue(issubclass(sqlite.NotSupportedError, 84 sqlite.DatabaseError), 85 "NotSupportedError is not a subclass of DatabaseError") 86 87 class ConnectionTests(unittest.TestCase): 88 def setUp(self): 89 self.cx = sqlite.connect(":memory:") 90 cu = self.cx.cursor() 91 cu.execute("create table test(id integer primary key, name text)") 92 cu.execute("insert into test(name) values (?)", ("foo",)) 93 94 def tearDown(self): 95 self.cx.close() 96 97 def CheckCommit(self): 98 self.cx.commit() 99 100 def CheckCommitAfterNoChanges(self): 101 """ 102 A commit should also work when no changes were made to the database. 103 """ 104 self.cx.commit() 105 self.cx.commit() 106 107 def CheckRollback(self): 108 self.cx.rollback() 109 110 def CheckRollbackAfterNoChanges(self): 111 """ 112 A rollback should also work when no changes were made to the database. 113 """ 114 self.cx.rollback() 115 self.cx.rollback() 116 117 def CheckCursor(self): 118 cu = self.cx.cursor() 119 120 def CheckFailedOpen(self): 121 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" 122 try: 123 con = sqlite.connect(YOU_CANNOT_OPEN_THIS) 124 except sqlite.OperationalError: 125 return 126 self.fail("should have raised an OperationalError") 127 128 def CheckClose(self): 129 self.cx.close() 130 131 def CheckExceptions(self): 132 # Optional DB-API extension. 133 self.assertEqual(self.cx.Warning, sqlite.Warning) 134 self.assertEqual(self.cx.Error, sqlite.Error) 135 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) 136 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) 137 self.assertEqual(self.cx.DataError, sqlite.DataError) 138 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) 139 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) 140 self.assertEqual(self.cx.InternalError, sqlite.InternalError) 141 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) 142 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) 143 144 class CursorTests(unittest.TestCase): 145 def setUp(self): 146 self.cx = sqlite.connect(":memory:") 147 self.cu = self.cx.cursor() 148 self.cu.execute("create table test(id integer primary key, name text, income number)") 149 self.cu.execute("insert into test(name) values (?)", ("foo",)) 150 151 def tearDown(self): 152 self.cu.close() 153 self.cx.close() 154 155 def CheckExecuteNoArgs(self): 156 self.cu.execute("delete from test") 157 158 def CheckExecuteIllegalSql(self): 159 try: 160 self.cu.execute("select asdf") 161 self.fail("should have raised an OperationalError") 162 except sqlite.OperationalError: 163 return 164 except: 165 self.fail("raised wrong exception") 166 167 def CheckExecuteTooMuchSql(self): 168 try: 169 self.cu.execute("select 5+4; select 4+5") 170 self.fail("should have raised a Warning") 171 except sqlite.Warning: 172 return 173 except: 174 self.fail("raised wrong exception") 175 176 def CheckExecuteTooMuchSql2(self): 177 self.cu.execute("select 5+4; -- foo bar") 178 179 def CheckExecuteTooMuchSql3(self): 180 self.cu.execute(""" 181 select 5+4; 182 183 /* 184 foo 185 */ 186 """) 187 188 def CheckExecuteWrongSqlArg(self): 189 try: 190 self.cu.execute(42) 191 self.fail("should have raised a ValueError") 192 except ValueError: 193 return 194 except: 195 self.fail("raised wrong exception.") 196 197 def CheckExecuteArgInt(self): 198 self.cu.execute("insert into test(id) values (?)", (42,)) 199 200 def CheckExecuteArgFloat(self): 201 self.cu.execute("insert into test(income) values (?)", (2500.32,)) 202 203 def CheckExecuteArgString(self): 204 self.cu.execute("insert into test(name) values (?)", ("Hugo",)) 205 206 def CheckExecuteArgStringWithZeroByte(self): 207 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) 208 209 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) 210 row = self.cu.fetchone() 211 self.assertEqual(row[0], "Hu\x00go") 212 213 def CheckExecuteWrongNoOfArgs1(self): 214 # too many parameters 215 try: 216 self.cu.execute("insert into test(id) values (?)", (17, "Egon")) 217 self.fail("should have raised ProgrammingError") 218 except sqlite.ProgrammingError: 219 pass 220 221 def CheckExecuteWrongNoOfArgs2(self): 222 # too little parameters 223 try: 224 self.cu.execute("insert into test(id) values (?)") 225 self.fail("should have raised ProgrammingError") 226 except sqlite.ProgrammingError: 227 pass 228 229 def CheckExecuteWrongNoOfArgs3(self): 230 # no parameters, parameters are needed 231 try: 232 self.cu.execute("insert into test(id) values (?)") 233 self.fail("should have raised ProgrammingError") 234 except sqlite.ProgrammingError: 235 pass 236 237 def CheckExecuteParamList(self): 238 self.cu.execute("insert into test(name) values ('foo')") 239 self.cu.execute("select name from test where name=?", ["foo"]) 240 row = self.cu.fetchone() 241 self.assertEqual(row[0], "foo") 242 243 def CheckExecuteParamSequence(self): 244 class L(object): 245 def __len__(self): 246 return 1 247 def __getitem__(self, x): 248 assert x == 0 249 return "foo" 250 251 self.cu.execute("insert into test(name) values ('foo')") 252 self.cu.execute("select name from test where name=?", L()) 253 row = self.cu.fetchone() 254 self.assertEqual(row[0], "foo") 255 256 def CheckExecuteDictMapping(self): 257 self.cu.execute("insert into test(name) values ('foo')") 258 self.cu.execute("select name from test where name=:name", {"name": "foo"}) 259 row = self.cu.fetchone() 260 self.assertEqual(row[0], "foo") 261 262 def CheckExecuteDictMapping_Mapping(self): 263 # Test only works with Python 2.5 or later 264 if sys.version_info < (2, 5, 0): 265 return 266 267 class D(dict): 268 def __missing__(self, key): 269 return "foo" 270 271 self.cu.execute("insert into test(name) values ('foo')") 272 self.cu.execute("select name from test where name=:name", D()) 273 row = self.cu.fetchone() 274 self.assertEqual(row[0], "foo") 275 276 def CheckExecuteDictMappingTooLittleArgs(self): 277 self.cu.execute("insert into test(name) values ('foo')") 278 try: 279 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) 280 self.fail("should have raised ProgrammingError") 281 except sqlite.ProgrammingError: 282 pass 283 284 def CheckExecuteDictMappingNoArgs(self): 285 self.cu.execute("insert into test(name) values ('foo')") 286 try: 287 self.cu.execute("select name from test where name=:name") 288 self.fail("should have raised ProgrammingError") 289 except sqlite.ProgrammingError: 290 pass 291 292 def CheckExecuteDictMappingUnnamed(self): 293 self.cu.execute("insert into test(name) values ('foo')") 294 try: 295 self.cu.execute("select name from test where name=?", {"name": "foo"}) 296 self.fail("should have raised ProgrammingError") 297 except sqlite.ProgrammingError: 298 pass 299 300 def CheckClose(self): 301 self.cu.close() 302 303 def CheckRowcountExecute(self): 304 self.cu.execute("delete from test") 305 self.cu.execute("insert into test(name) values ('foo')") 306 self.cu.execute("insert into test(name) values ('foo')") 307 self.cu.execute("update test set name='bar'") 308 self.assertEqual(self.cu.rowcount, 2) 309 310 def CheckRowcountSelect(self): 311 """ 312 pysqlite does not know the rowcount of SELECT statements, because we 313 don't fetch all rows after executing the select statement. The rowcount 314 has thus to be -1. 315 """ 316 self.cu.execute("select 5 union select 6") 317 self.assertEqual(self.cu.rowcount, -1) 318 319 def CheckRowcountExecutemany(self): 320 self.cu.execute("delete from test") 321 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) 322 self.assertEqual(self.cu.rowcount, 3) 323 324 def CheckTotalChanges(self): 325 self.cu.execute("insert into test(name) values ('foo')") 326 self.cu.execute("insert into test(name) values ('foo')") 327 if self.cx.total_changes < 2: 328 self.fail("total changes reported wrong value") 329 330 # Checks for executemany: 331 # Sequences are required by the DB-API, iterators 332 # enhancements in pysqlite. 333 334 def CheckExecuteManySequence(self): 335 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) 336 337 def CheckExecuteManyIterator(self): 338 class MyIter: 339 def __init__(self): 340 self.value = 5 341 342 def next(self): 343 if self.value == 10: 344 raise StopIteration 345 else: 346 self.value += 1 347 return (self.value,) 348 349 self.cu.executemany("insert into test(income) values (?)", MyIter()) 350 351 def CheckExecuteManyGenerator(self): 352 def mygen(): 353 for i in range(5): 354 yield (i,) 355 356 self.cu.executemany("insert into test(income) values (?)", mygen()) 357 358 def CheckExecuteManyWrongSqlArg(self): 359 try: 360 self.cu.executemany(42, [(3,)]) 361 self.fail("should have raised a ValueError") 362 except ValueError: 363 return 364 except: 365 self.fail("raised wrong exception.") 366 367 def CheckExecuteManySelect(self): 368 try: 369 self.cu.executemany("select ?", [(3,)]) 370 self.fail("should have raised a ProgrammingError") 371 except sqlite.ProgrammingError: 372 return 373 except: 374 self.fail("raised wrong exception.") 375 376 def CheckExecuteManyNotIterable(self): 377 try: 378 self.cu.executemany("insert into test(income) values (?)", 42) 379 self.fail("should have raised a TypeError") 380 except TypeError: 381 return 382 except Exception, e: 383 print "raised", e.__class__ 384 self.fail("raised wrong exception.") 385 386 def CheckFetchIter(self): 387 # Optional DB-API extension. 388 self.cu.execute("delete from test") 389 self.cu.execute("insert into test(id) values (?)", (5,)) 390 self.cu.execute("insert into test(id) values (?)", (6,)) 391 self.cu.execute("select id from test order by id") 392 lst = [] 393 for row in self.cu: 394 lst.append(row[0]) 395 self.assertEqual(lst[0], 5) 396 self.assertEqual(lst[1], 6) 397 398 def CheckFetchone(self): 399 self.cu.execute("select name from test") 400 row = self.cu.fetchone() 401 self.assertEqual(row[0], "foo") 402 row = self.cu.fetchone() 403 self.assertEqual(row, None) 404 405 def CheckFetchoneNoStatement(self): 406 cur = self.cx.cursor() 407 row = cur.fetchone() 408 self.assertEqual(row, None) 409 410 def CheckArraySize(self): 411 # must default ot 1 412 self.assertEqual(self.cu.arraysize, 1) 413 414 # now set to 2 415 self.cu.arraysize = 2 416 417 # now make the query return 3 rows 418 self.cu.execute("delete from test") 419 self.cu.execute("insert into test(name) values ('A')") 420 self.cu.execute("insert into test(name) values ('B')") 421 self.cu.execute("insert into test(name) values ('C')") 422 self.cu.execute("select name from test") 423 res = self.cu.fetchmany() 424 425 self.assertEqual(len(res), 2) 426 427 def CheckFetchmany(self): 428 self.cu.execute("select name from test") 429 res = self.cu.fetchmany(100) 430 self.assertEqual(len(res), 1) 431 res = self.cu.fetchmany(100) 432 self.assertEqual(res, []) 433 434 def CheckFetchmanyKwArg(self): 435 """Checks if fetchmany works with keyword arguments""" 436 self.cu.execute("select name from test") 437 res = self.cu.fetchmany(size=100) 438 self.assertEqual(len(res), 1) 439 440 def CheckFetchall(self): 441 self.cu.execute("select name from test") 442 res = self.cu.fetchall() 443 self.assertEqual(len(res), 1) 444 res = self.cu.fetchall() 445 self.assertEqual(res, []) 446 447 def CheckSetinputsizes(self): 448 self.cu.setinputsizes([3, 4, 5]) 449 450 def CheckSetoutputsize(self): 451 self.cu.setoutputsize(5, 0) 452 453 def CheckSetoutputsizeNoColumn(self): 454 self.cu.setoutputsize(42) 455 456 def CheckCursorConnection(self): 457 # Optional DB-API extension. 458 self.assertEqual(self.cu.connection, self.cx) 459 460 def CheckWrongCursorCallable(self): 461 try: 462 def f(): pass 463 cur = self.cx.cursor(f) 464 self.fail("should have raised a TypeError") 465 except TypeError: 466 return 467 self.fail("should have raised a ValueError") 468 469 def CheckCursorWrongClass(self): 470 class Foo: pass 471 foo = Foo() 472 try: 473 cur = sqlite.Cursor(foo) 474 self.fail("should have raised a ValueError") 475 except TypeError: 476 pass 477 478 @unittest.skipUnless(threading, 'This test requires threading.') 479 class ThreadTests(unittest.TestCase): 480 def setUp(self): 481 self.con = sqlite.connect(":memory:") 482 self.cur = self.con.cursor() 483 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") 484 485 def tearDown(self): 486 self.cur.close() 487 self.con.close() 488 489 def CheckConCursor(self): 490 def run(con, errors): 491 try: 492 cur = con.cursor() 493 errors.append("did not raise ProgrammingError") 494 return 495 except sqlite.ProgrammingError: 496 return 497 except: 498 errors.append("raised wrong exception") 499 500 errors = [] 501 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 502 t.start() 503 t.join() 504 if len(errors) > 0: 505 self.fail("\n".join(errors)) 506 507 def CheckConCommit(self): 508 def run(con, errors): 509 try: 510 con.commit() 511 errors.append("did not raise ProgrammingError") 512 return 513 except sqlite.ProgrammingError: 514 return 515 except: 516 errors.append("raised wrong exception") 517 518 errors = [] 519 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 520 t.start() 521 t.join() 522 if len(errors) > 0: 523 self.fail("\n".join(errors)) 524 525 def CheckConRollback(self): 526 def run(con, errors): 527 try: 528 con.rollback() 529 errors.append("did not raise ProgrammingError") 530 return 531 except sqlite.ProgrammingError: 532 return 533 except: 534 errors.append("raised wrong exception") 535 536 errors = [] 537 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 538 t.start() 539 t.join() 540 if len(errors) > 0: 541 self.fail("\n".join(errors)) 542 543 def CheckConClose(self): 544 def run(con, errors): 545 try: 546 con.close() 547 errors.append("did not raise ProgrammingError") 548 return 549 except sqlite.ProgrammingError: 550 return 551 except: 552 errors.append("raised wrong exception") 553 554 errors = [] 555 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 556 t.start() 557 t.join() 558 if len(errors) > 0: 559 self.fail("\n".join(errors)) 560 561 def CheckCurImplicitBegin(self): 562 def run(cur, errors): 563 try: 564 cur.execute("insert into test(name) values ('a')") 565 errors.append("did not raise ProgrammingError") 566 return 567 except sqlite.ProgrammingError: 568 return 569 except: 570 errors.append("raised wrong exception") 571 572 errors = [] 573 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 574 t.start() 575 t.join() 576 if len(errors) > 0: 577 self.fail("\n".join(errors)) 578 579 def CheckCurClose(self): 580 def run(cur, errors): 581 try: 582 cur.close() 583 errors.append("did not raise ProgrammingError") 584 return 585 except sqlite.ProgrammingError: 586 return 587 except: 588 errors.append("raised wrong exception") 589 590 errors = [] 591 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 592 t.start() 593 t.join() 594 if len(errors) > 0: 595 self.fail("\n".join(errors)) 596 597 def CheckCurExecute(self): 598 def run(cur, errors): 599 try: 600 cur.execute("select name from test") 601 errors.append("did not raise ProgrammingError") 602 return 603 except sqlite.ProgrammingError: 604 return 605 except: 606 errors.append("raised wrong exception") 607 608 errors = [] 609 self.cur.execute("insert into test(name) values ('a')") 610 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 611 t.start() 612 t.join() 613 if len(errors) > 0: 614 self.fail("\n".join(errors)) 615 616 def CheckCurIterNext(self): 617 def run(cur, errors): 618 try: 619 row = cur.fetchone() 620 errors.append("did not raise ProgrammingError") 621 return 622 except sqlite.ProgrammingError: 623 return 624 except: 625 errors.append("raised wrong exception") 626 627 errors = [] 628 self.cur.execute("insert into test(name) values ('a')") 629 self.cur.execute("select name from test") 630 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 631 t.start() 632 t.join() 633 if len(errors) > 0: 634 self.fail("\n".join(errors)) 635 636 class ConstructorTests(unittest.TestCase): 637 def CheckDate(self): 638 d = sqlite.Date(2004, 10, 28) 639 640 def CheckTime(self): 641 t = sqlite.Time(12, 39, 35) 642 643 def CheckTimestamp(self): 644 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) 645 646 def CheckDateFromTicks(self): 647 d = sqlite.DateFromTicks(42) 648 649 def CheckTimeFromTicks(self): 650 t = sqlite.TimeFromTicks(42) 651 652 def CheckTimestampFromTicks(self): 653 ts = sqlite.TimestampFromTicks(42) 654 655 def CheckBinary(self): 656 b = sqlite.Binary(chr(0) + "'") 657 658 class ExtensionTests(unittest.TestCase): 659 def CheckScriptStringSql(self): 660 con = sqlite.connect(":memory:") 661 cur = con.cursor() 662 cur.executescript(""" 663 -- bla bla 664 /* a stupid comment */ 665 create table a(i); 666 insert into a(i) values (5); 667 """) 668 cur.execute("select i from a") 669 res = cur.fetchone()[0] 670 self.assertEqual(res, 5) 671 672 def CheckScriptStringUnicode(self): 673 con = sqlite.connect(":memory:") 674 cur = con.cursor() 675 cur.executescript(u""" 676 create table a(i); 677 insert into a(i) values (5); 678 select i from a; 679 delete from a; 680 insert into a(i) values (6); 681 """) 682 cur.execute("select i from a") 683 res = cur.fetchone()[0] 684 self.assertEqual(res, 6) 685 686 def CheckScriptSyntaxError(self): 687 con = sqlite.connect(":memory:") 688 cur = con.cursor() 689 raised = False 690 try: 691 cur.executescript("create table test(x); asdf; create table test2(x)") 692 except sqlite.OperationalError: 693 raised = True 694 self.assertEqual(raised, True, "should have raised an exception") 695 696 def CheckScriptErrorNormal(self): 697 con = sqlite.connect(":memory:") 698 cur = con.cursor() 699 raised = False 700 try: 701 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") 702 except sqlite.OperationalError: 703 raised = True 704 self.assertEqual(raised, True, "should have raised an exception") 705 706 def CheckConnectionExecute(self): 707 con = sqlite.connect(":memory:") 708 result = con.execute("select 5").fetchone()[0] 709 self.assertEqual(result, 5, "Basic test of Connection.execute") 710 711 def CheckConnectionExecutemany(self): 712 con = sqlite.connect(":memory:") 713 con.execute("create table test(foo)") 714 con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) 715 result = con.execute("select foo from test order by foo").fetchall() 716 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") 717 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") 718 719 def CheckConnectionExecutescript(self): 720 con = sqlite.connect(":memory:") 721 con.executescript("create table test(foo); insert into test(foo) values (5);") 722 result = con.execute("select foo from test").fetchone()[0] 723 self.assertEqual(result, 5, "Basic test of Connection.executescript") 724 725 class ClosedConTests(unittest.TestCase): 726 def setUp(self): 727 pass 728 729 def tearDown(self): 730 pass 731 732 def CheckClosedConCursor(self): 733 con = sqlite.connect(":memory:") 734 con.close() 735 try: 736 cur = con.cursor() 737 self.fail("Should have raised a ProgrammingError") 738 except sqlite.ProgrammingError: 739 pass 740 except: 741 self.fail("Should have raised a ProgrammingError") 742 743 def CheckClosedConCommit(self): 744 con = sqlite.connect(":memory:") 745 con.close() 746 try: 747 con.commit() 748 self.fail("Should have raised a ProgrammingError") 749 except sqlite.ProgrammingError: 750 pass 751 except: 752 self.fail("Should have raised a ProgrammingError") 753 754 def CheckClosedConRollback(self): 755 con = sqlite.connect(":memory:") 756 con.close() 757 try: 758 con.rollback() 759 self.fail("Should have raised a ProgrammingError") 760 except sqlite.ProgrammingError: 761 pass 762 except: 763 self.fail("Should have raised a ProgrammingError") 764 765 def CheckClosedCurExecute(self): 766 con = sqlite.connect(":memory:") 767 cur = con.cursor() 768 con.close() 769 try: 770 cur.execute("select 4") 771 self.fail("Should have raised a ProgrammingError") 772 except sqlite.ProgrammingError: 773 pass 774 except: 775 self.fail("Should have raised a ProgrammingError") 776 777 def CheckClosedCreateFunction(self): 778 con = sqlite.connect(":memory:") 779 con.close() 780 def f(x): return 17 781 try: 782 con.create_function("foo", 1, f) 783 self.fail("Should have raised a ProgrammingError") 784 except sqlite.ProgrammingError: 785 pass 786 except: 787 self.fail("Should have raised a ProgrammingError") 788 789 def CheckClosedCreateAggregate(self): 790 con = sqlite.connect(":memory:") 791 con.close() 792 class Agg: 793 def __init__(self): 794 pass 795 def step(self, x): 796 pass 797 def finalize(self): 798 return 17 799 try: 800 con.create_aggregate("foo", 1, Agg) 801 self.fail("Should have raised a ProgrammingError") 802 except sqlite.ProgrammingError: 803 pass 804 except: 805 self.fail("Should have raised a ProgrammingError") 806 807 def CheckClosedSetAuthorizer(self): 808 con = sqlite.connect(":memory:") 809 con.close() 810 def authorizer(*args): 811 return sqlite.DENY 812 try: 813 con.set_authorizer(authorizer) 814 self.fail("Should have raised a ProgrammingError") 815 except sqlite.ProgrammingError: 816 pass 817 except: 818 self.fail("Should have raised a ProgrammingError") 819 820 def CheckClosedSetProgressCallback(self): 821 con = sqlite.connect(":memory:") 822 con.close() 823 def progress(): pass 824 try: 825 con.set_progress_handler(progress, 100) 826 self.fail("Should have raised a ProgrammingError") 827 except sqlite.ProgrammingError: 828 pass 829 except: 830 self.fail("Should have raised a ProgrammingError") 831 832 def CheckClosedCall(self): 833 con = sqlite.connect(":memory:") 834 con.close() 835 try: 836 con() 837 self.fail("Should have raised a ProgrammingError") 838 except sqlite.ProgrammingError: 839 pass 840 except: 841 self.fail("Should have raised a ProgrammingError") 842 843 class ClosedCurTests(unittest.TestCase): 844 def setUp(self): 845 pass 846 847 def tearDown(self): 848 pass 849 850 def CheckClosed(self): 851 con = sqlite.connect(":memory:") 852 cur = con.cursor() 853 cur.close() 854 855 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): 856 if method_name in ("execute", "executescript"): 857 params = ("select 4 union select 5",) 858 elif method_name == "executemany": 859 params = ("insert into foo(bar) values (?)", [(3,), (4,)]) 860 else: 861 params = [] 862 863 try: 864 method = getattr(cur, method_name) 865 866 method(*params) 867 self.fail("Should have raised a ProgrammingError: method " + method_name) 868 except sqlite.ProgrammingError: 869 pass 870 except: 871 self.fail("Should have raised a ProgrammingError: " + method_name) 872 873 def suite(): 874 module_suite = unittest.makeSuite(ModuleTests, "Check") 875 connection_suite = unittest.makeSuite(ConnectionTests, "Check") 876 cursor_suite = unittest.makeSuite(CursorTests, "Check") 877 thread_suite = unittest.makeSuite(ThreadTests, "Check") 878 constructor_suite = unittest.makeSuite(ConstructorTests, "Check") 879 ext_suite = unittest.makeSuite(ExtensionTests, "Check") 880 closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") 881 closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") 882 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite)) 883 884 def test(): 885 runner = unittest.TextTestRunner() 886 runner.run(suite()) 887 888 if __name__ == "__main__": 889 test() 890