Home | History | Annotate | Download | only in test
      1 #-*- coding: ISO-8859-1 -*-
      2 # pysqlite2/test/dbapi.py: tests for DB-API compliance
      3 #
      4 # Copyright (C) 2004-2010 Gerhard Hring <gh (at] ghaering.de>
      5 #
      6 # This file is part of pysqlite.
      7 #
      8 # This software is provided 'as-is', without any express or implied
      9 # warranty.  In no event will the authors be held liable for any damages
     10 # arising from the use of this software.
     11 #
     12 # Permission is granted to anyone to use this software for any purpose,
     13 # including commercial applications, and to alter it and redistribute it
     14 # freely, subject to the following restrictions:
     15 #
     16 # 1. The origin of this software must not be misrepresented; you must not
     17 #    claim that you wrote the original software. If you use this software
     18 #    in a product, an acknowledgment in the product documentation would be
     19 #    appreciated but is not required.
     20 # 2. Altered source versions must be plainly marked as such, and must not be
     21 #    misrepresented as being the original software.
     22 # 3. This notice may not be removed or altered from any source distribution.
     23 
     24 import unittest
     25 import sys
     26 import sqlite3 as sqlite
     27 from test import test_support
     28 try:
     29     import threading
     30 except ImportError:
     31     threading = None
     32 
     33 class ModuleTests(unittest.TestCase):
     34     def CheckAPILevel(self):
     35         self.assertEqual(sqlite.apilevel, "2.0",
     36                          "apilevel is %s, should be 2.0" % sqlite.apilevel)
     37 
     38     def CheckThreadSafety(self):
     39         self.assertEqual(sqlite.threadsafety, 1,
     40                          "threadsafety is %d, should be 1" % sqlite.threadsafety)
     41 
     42     def CheckParamStyle(self):
     43         self.assertEqual(sqlite.paramstyle, "qmark",
     44                          "paramstyle is '%s', should be 'qmark'" %
     45                          sqlite.paramstyle)
     46 
     47     def CheckWarning(self):
     48         self.assertTrue(issubclass(sqlite.Warning, StandardError),
     49                         "Warning is not a subclass of StandardError")
     50 
     51     def CheckError(self):
     52         self.assertTrue(issubclass(sqlite.Error, StandardError),
     53                         "Error is not a subclass of StandardError")
     54 
     55     def CheckInterfaceError(self):
     56         self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
     57                         "InterfaceError is not a subclass of Error")
     58 
     59     def CheckDatabaseError(self):
     60         self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
     61                         "DatabaseError is not a subclass of Error")
     62 
     63     def CheckDataError(self):
     64         self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
     65                         "DataError is not a subclass of DatabaseError")
     66 
     67     def CheckOperationalError(self):
     68         self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
     69                         "OperationalError is not a subclass of DatabaseError")
     70 
     71     def CheckIntegrityError(self):
     72         self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
     73                         "IntegrityError is not a subclass of DatabaseError")
     74 
     75     def CheckInternalError(self):
     76         self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
     77                         "InternalError is not a subclass of DatabaseError")
     78 
     79     def CheckProgrammingError(self):
     80         self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
     81                         "ProgrammingError is not a subclass of DatabaseError")
     82 
     83     def CheckNotSupportedError(self):
     84         self.assertTrue(issubclass(sqlite.NotSupportedError,
     85                                    sqlite.DatabaseError),
     86                         "NotSupportedError is not a subclass of DatabaseError")
     87 
     88 class ConnectionTests(unittest.TestCase):
     89     def setUp(self):
     90         self.cx = sqlite.connect(":memory:")
     91         cu = self.cx.cursor()
     92         cu.execute("create table test(id integer primary key, name text)")
     93         cu.execute("insert into test(name) values (?)", ("foo",))
     94 
     95     def tearDown(self):
     96         self.cx.close()
     97 
     98     def CheckCommit(self):
     99         self.cx.commit()
    100 
    101     def CheckCommitAfterNoChanges(self):
    102         """
    103         A commit should also work when no changes were made to the database.
    104         """
    105         self.cx.commit()
    106         self.cx.commit()
    107 
    108     def CheckRollback(self):
    109         self.cx.rollback()
    110 
    111     def CheckRollbackAfterNoChanges(self):
    112         """
    113         A rollback should also work when no changes were made to the database.
    114         """
    115         self.cx.rollback()
    116         self.cx.rollback()
    117 
    118     def CheckCursor(self):
    119         cu = self.cx.cursor()
    120 
    121     def CheckFailedOpen(self):
    122         YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
    123         try:
    124             con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
    125         except sqlite.OperationalError:
    126             return
    127         self.fail("should have raised an OperationalError")
    128 
    129     def CheckClose(self):
    130         self.cx.close()
    131 
    132     def CheckExceptions(self):
    133         # Optional DB-API extension.
    134         self.assertEqual(self.cx.Warning, sqlite.Warning)
    135         self.assertEqual(self.cx.Error, sqlite.Error)
    136         self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
    137         self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
    138         self.assertEqual(self.cx.DataError, sqlite.DataError)
    139         self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
    140         self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
    141         self.assertEqual(self.cx.InternalError, sqlite.InternalError)
    142         self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
    143         self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
    144 
    145 class CursorTests(unittest.TestCase):
    146     def setUp(self):
    147         self.cx = sqlite.connect(":memory:")
    148         self.cu = self.cx.cursor()
    149         self.cu.execute("create table test(id integer primary key, name text, income number)")
    150         self.cu.execute("insert into test(name) values (?)", ("foo",))
    151 
    152     def tearDown(self):
    153         self.cu.close()
    154         self.cx.close()
    155 
    156     def CheckExecuteNoArgs(self):
    157         self.cu.execute("delete from test")
    158 
    159     def CheckExecuteIllegalSql(self):
    160         try:
    161             self.cu.execute("select asdf")
    162             self.fail("should have raised an OperationalError")
    163         except sqlite.OperationalError:
    164             return
    165         except:
    166             self.fail("raised wrong exception")
    167 
    168     def CheckExecuteTooMuchSql(self):
    169         try:
    170             self.cu.execute("select 5+4; select 4+5")
    171             self.fail("should have raised a Warning")
    172         except sqlite.Warning:
    173             return
    174         except:
    175             self.fail("raised wrong exception")
    176 
    177     def CheckExecuteTooMuchSql2(self):
    178         self.cu.execute("select 5+4; -- foo bar")
    179 
    180     def CheckExecuteTooMuchSql3(self):
    181         self.cu.execute("""
    182             select 5+4;
    183 
    184             /*
    185             foo
    186             */
    187             """)
    188 
    189     def CheckExecuteWrongSqlArg(self):
    190         try:
    191             self.cu.execute(42)
    192             self.fail("should have raised a ValueError")
    193         except ValueError:
    194             return
    195         except:
    196             self.fail("raised wrong exception.")
    197 
    198     def CheckExecuteArgInt(self):
    199         self.cu.execute("insert into test(id) values (?)", (42,))
    200 
    201     def CheckExecuteArgFloat(self):
    202         self.cu.execute("insert into test(income) values (?)", (2500.32,))
    203 
    204     def CheckExecuteArgString(self):
    205         self.cu.execute("insert into test(name) values (?)", ("Hugo",))
    206 
    207     def CheckExecuteArgStringWithZeroByte(self):
    208         self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
    209 
    210         self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
    211         row = self.cu.fetchone()
    212         self.assertEqual(row[0], "Hu\x00go")
    213 
    214     def CheckExecuteWrongNoOfArgs1(self):
    215         # too many parameters
    216         try:
    217             self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
    218             self.fail("should have raised ProgrammingError")
    219         except sqlite.ProgrammingError:
    220             pass
    221 
    222     def CheckExecuteWrongNoOfArgs2(self):
    223         # too little parameters
    224         try:
    225             self.cu.execute("insert into test(id) values (?)")
    226             self.fail("should have raised ProgrammingError")
    227         except sqlite.ProgrammingError:
    228             pass
    229 
    230     def CheckExecuteWrongNoOfArgs3(self):
    231         # no parameters, parameters are needed
    232         try:
    233             self.cu.execute("insert into test(id) values (?)")
    234             self.fail("should have raised ProgrammingError")
    235         except sqlite.ProgrammingError:
    236             pass
    237 
    238     def CheckExecuteParamList(self):
    239         self.cu.execute("insert into test(name) values ('foo')")
    240         self.cu.execute("select name from test where name=?", ["foo"])
    241         row = self.cu.fetchone()
    242         self.assertEqual(row[0], "foo")
    243 
    244     def CheckExecuteParamSequence(self):
    245         class L(object):
    246             def __len__(self):
    247                 return 1
    248             def __getitem__(self, x):
    249                 assert x == 0
    250                 return "foo"
    251 
    252         self.cu.execute("insert into test(name) values ('foo')")
    253         self.cu.execute("select name from test where name=?", L())
    254         row = self.cu.fetchone()
    255         self.assertEqual(row[0], "foo")
    256 
    257     def CheckExecuteDictMapping(self):
    258         self.cu.execute("insert into test(name) values ('foo')")
    259         self.cu.execute("select name from test where name=:name", {"name": "foo"})
    260         row = self.cu.fetchone()
    261         self.assertEqual(row[0], "foo")
    262 
    263     def CheckExecuteDictMapping_Mapping(self):
    264         # Test only works with Python 2.5 or later
    265         if sys.version_info < (2, 5, 0):
    266             return
    267 
    268         class D(dict):
    269             def __missing__(self, key):
    270                 return "foo"
    271 
    272         self.cu.execute("insert into test(name) values ('foo')")
    273         self.cu.execute("select name from test where name=:name", D())
    274         row = self.cu.fetchone()
    275         self.assertEqual(row[0], "foo")
    276 
    277     def CheckExecuteDictMappingTooLittleArgs(self):
    278         self.cu.execute("insert into test(name) values ('foo')")
    279         try:
    280             self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
    281             self.fail("should have raised ProgrammingError")
    282         except sqlite.ProgrammingError:
    283             pass
    284 
    285     def CheckExecuteDictMappingNoArgs(self):
    286         self.cu.execute("insert into test(name) values ('foo')")
    287         try:
    288             self.cu.execute("select name from test where name=:name")
    289             self.fail("should have raised ProgrammingError")
    290         except sqlite.ProgrammingError:
    291             pass
    292 
    293     def CheckExecuteDictMappingUnnamed(self):
    294         self.cu.execute("insert into test(name) values ('foo')")
    295         try:
    296             self.cu.execute("select name from test where name=?", {"name": "foo"})
    297             self.fail("should have raised ProgrammingError")
    298         except sqlite.ProgrammingError:
    299             pass
    300 
    301     def CheckClose(self):
    302         self.cu.close()
    303 
    304     def CheckRowcountExecute(self):
    305         self.cu.execute("delete from test")
    306         self.cu.execute("insert into test(name) values ('foo')")
    307         self.cu.execute("insert into test(name) values ('foo')")
    308         self.cu.execute("update test set name='bar'")
    309         self.assertEqual(self.cu.rowcount, 2)
    310 
    311     def CheckRowcountSelect(self):
    312         """
    313         pysqlite does not know the rowcount of SELECT statements, because we
    314         don't fetch all rows after executing the select statement. The rowcount
    315         has thus to be -1.
    316         """
    317         self.cu.execute("select 5 union select 6")
    318         self.assertEqual(self.cu.rowcount, -1)
    319 
    320     def CheckRowcountExecutemany(self):
    321         self.cu.execute("delete from test")
    322         self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
    323         self.assertEqual(self.cu.rowcount, 3)
    324 
    325     def CheckTotalChanges(self):
    326         self.cu.execute("insert into test(name) values ('foo')")
    327         self.cu.execute("insert into test(name) values ('foo')")
    328         if self.cx.total_changes < 2:
    329             self.fail("total changes reported wrong value")
    330 
    331     # Checks for executemany:
    332     # Sequences are required by the DB-API, iterators
    333     # enhancements in pysqlite.
    334 
    335     def CheckExecuteManySequence(self):
    336         self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
    337 
    338     def CheckExecuteManyIterator(self):
    339         class MyIter:
    340             def __init__(self):
    341                 self.value = 5
    342 
    343             def next(self):
    344                 if self.value == 10:
    345                     raise StopIteration
    346                 else:
    347                     self.value += 1
    348                     return (self.value,)
    349 
    350         self.cu.executemany("insert into test(income) values (?)", MyIter())
    351 
    352     def CheckExecuteManyGenerator(self):
    353         def mygen():
    354             for i in range(5):
    355                 yield (i,)
    356 
    357         self.cu.executemany("insert into test(income) values (?)", mygen())
    358 
    359     def CheckExecuteManyWrongSqlArg(self):
    360         try:
    361             self.cu.executemany(42, [(3,)])
    362             self.fail("should have raised a ValueError")
    363         except ValueError:
    364             return
    365         except:
    366             self.fail("raised wrong exception.")
    367 
    368     def CheckExecuteManySelect(self):
    369         try:
    370             self.cu.executemany("select ?", [(3,)])
    371             self.fail("should have raised a ProgrammingError")
    372         except sqlite.ProgrammingError:
    373             return
    374         except:
    375             self.fail("raised wrong exception.")
    376 
    377     def CheckExecuteManyNotIterable(self):
    378         try:
    379             self.cu.executemany("insert into test(income) values (?)", 42)
    380             self.fail("should have raised a TypeError")
    381         except TypeError:
    382             return
    383         except Exception, e:
    384             print "raised", e.__class__
    385             self.fail("raised wrong exception.")
    386 
    387     def CheckFetchIter(self):
    388         # Optional DB-API extension.
    389         self.cu.execute("delete from test")
    390         self.cu.execute("insert into test(id) values (?)", (5,))
    391         self.cu.execute("insert into test(id) values (?)", (6,))
    392         self.cu.execute("select id from test order by id")
    393         lst = []
    394         for row in self.cu:
    395             lst.append(row[0])
    396         self.assertEqual(lst[0], 5)
    397         self.assertEqual(lst[1], 6)
    398 
    399     def CheckFetchone(self):
    400         self.cu.execute("select name from test")
    401         row = self.cu.fetchone()
    402         self.assertEqual(row[0], "foo")
    403         row = self.cu.fetchone()
    404         self.assertEqual(row, None)
    405 
    406     def CheckFetchoneNoStatement(self):
    407         cur = self.cx.cursor()
    408         row = cur.fetchone()
    409         self.assertEqual(row, None)
    410 
    411     def CheckArraySize(self):
    412         # must default ot 1
    413         self.assertEqual(self.cu.arraysize, 1)
    414 
    415         # now set to 2
    416         self.cu.arraysize = 2
    417 
    418         # now make the query return 3 rows
    419         self.cu.execute("delete from test")
    420         self.cu.execute("insert into test(name) values ('A')")
    421         self.cu.execute("insert into test(name) values ('B')")
    422         self.cu.execute("insert into test(name) values ('C')")
    423         self.cu.execute("select name from test")
    424         res = self.cu.fetchmany()
    425 
    426         self.assertEqual(len(res), 2)
    427 
    428     def CheckFetchmany(self):
    429         self.cu.execute("select name from test")
    430         res = self.cu.fetchmany(100)
    431         self.assertEqual(len(res), 1)
    432         res = self.cu.fetchmany(100)
    433         self.assertEqual(res, [])
    434 
    435     def CheckFetchmanyKwArg(self):
    436         """Checks if fetchmany works with keyword arguments"""
    437         self.cu.execute("select name from test")
    438         res = self.cu.fetchmany(size=100)
    439         self.assertEqual(len(res), 1)
    440 
    441     def CheckFetchall(self):
    442         self.cu.execute("select name from test")
    443         res = self.cu.fetchall()
    444         self.assertEqual(len(res), 1)
    445         res = self.cu.fetchall()
    446         self.assertEqual(res, [])
    447 
    448     def CheckSetinputsizes(self):
    449         self.cu.setinputsizes([3, 4, 5])
    450 
    451     def CheckSetoutputsize(self):
    452         self.cu.setoutputsize(5, 0)
    453 
    454     def CheckSetoutputsizeNoColumn(self):
    455         self.cu.setoutputsize(42)
    456 
    457     def CheckCursorConnection(self):
    458         # Optional DB-API extension.
    459         self.assertEqual(self.cu.connection, self.cx)
    460 
    461     def CheckWrongCursorCallable(self):
    462         try:
    463             def f(): pass
    464             cur = self.cx.cursor(f)
    465             self.fail("should have raised a TypeError")
    466         except TypeError:
    467             return
    468         self.fail("should have raised a ValueError")
    469 
    470     def CheckCursorWrongClass(self):
    471         class Foo: pass
    472         foo = Foo()
    473         try:
    474             cur = sqlite.Cursor(foo)
    475             self.fail("should have raised a ValueError")
    476         except TypeError:
    477             pass
    478 
    479 @unittest.skipUnless(threading, 'This test requires threading.')
    480 class ThreadTests(unittest.TestCase):
    481     def setUp(self):
    482         self.con = sqlite.connect(":memory:")
    483         self.cur = self.con.cursor()
    484         self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
    485 
    486     def tearDown(self):
    487         self.cur.close()
    488         self.con.close()
    489 
    490     def CheckConCursor(self):
    491         def run(con, errors):
    492             try:
    493                 cur = con.cursor()
    494                 errors.append("did not raise ProgrammingError")
    495                 return
    496             except sqlite.ProgrammingError:
    497                 return
    498             except:
    499                 errors.append("raised wrong exception")
    500 
    501         errors = []
    502         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
    503         t.start()
    504         t.join()
    505         if len(errors) > 0:
    506             self.fail("\n".join(errors))
    507 
    508     def CheckConCommit(self):
    509         def run(con, errors):
    510             try:
    511                 con.commit()
    512                 errors.append("did not raise ProgrammingError")
    513                 return
    514             except sqlite.ProgrammingError:
    515                 return
    516             except:
    517                 errors.append("raised wrong exception")
    518 
    519         errors = []
    520         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
    521         t.start()
    522         t.join()
    523         if len(errors) > 0:
    524             self.fail("\n".join(errors))
    525 
    526     def CheckConRollback(self):
    527         def run(con, errors):
    528             try:
    529                 con.rollback()
    530                 errors.append("did not raise ProgrammingError")
    531                 return
    532             except sqlite.ProgrammingError:
    533                 return
    534             except:
    535                 errors.append("raised wrong exception")
    536 
    537         errors = []
    538         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
    539         t.start()
    540         t.join()
    541         if len(errors) > 0:
    542             self.fail("\n".join(errors))
    543 
    544     def CheckConClose(self):
    545         def run(con, errors):
    546             try:
    547                 con.close()
    548                 errors.append("did not raise ProgrammingError")
    549                 return
    550             except sqlite.ProgrammingError:
    551                 return
    552             except:
    553                 errors.append("raised wrong exception")
    554 
    555         errors = []
    556         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
    557         t.start()
    558         t.join()
    559         if len(errors) > 0:
    560             self.fail("\n".join(errors))
    561 
    562     def CheckCurImplicitBegin(self):
    563         def run(cur, errors):
    564             try:
    565                 cur.execute("insert into test(name) values ('a')")
    566                 errors.append("did not raise ProgrammingError")
    567                 return
    568             except sqlite.ProgrammingError:
    569                 return
    570             except:
    571                 errors.append("raised wrong exception")
    572 
    573         errors = []
    574         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
    575         t.start()
    576         t.join()
    577         if len(errors) > 0:
    578             self.fail("\n".join(errors))
    579 
    580     def CheckCurClose(self):
    581         def run(cur, errors):
    582             try:
    583                 cur.close()
    584                 errors.append("did not raise ProgrammingError")
    585                 return
    586             except sqlite.ProgrammingError:
    587                 return
    588             except:
    589                 errors.append("raised wrong exception")
    590 
    591         errors = []
    592         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
    593         t.start()
    594         t.join()
    595         if len(errors) > 0:
    596             self.fail("\n".join(errors))
    597 
    598     def CheckCurExecute(self):
    599         def run(cur, errors):
    600             try:
    601                 cur.execute("select name from test")
    602                 errors.append("did not raise ProgrammingError")
    603                 return
    604             except sqlite.ProgrammingError:
    605                 return
    606             except:
    607                 errors.append("raised wrong exception")
    608 
    609         errors = []
    610         self.cur.execute("insert into test(name) values ('a')")
    611         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
    612         t.start()
    613         t.join()
    614         if len(errors) > 0:
    615             self.fail("\n".join(errors))
    616 
    617     def CheckCurIterNext(self):
    618         def run(cur, errors):
    619             try:
    620                 row = cur.fetchone()
    621                 errors.append("did not raise ProgrammingError")
    622                 return
    623             except sqlite.ProgrammingError:
    624                 return
    625             except:
    626                 errors.append("raised wrong exception")
    627 
    628         errors = []
    629         self.cur.execute("insert into test(name) values ('a')")
    630         self.cur.execute("select name from test")
    631         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
    632         t.start()
    633         t.join()
    634         if len(errors) > 0:
    635             self.fail("\n".join(errors))
    636 
    637 class ConstructorTests(unittest.TestCase):
    638     def CheckDate(self):
    639         d = sqlite.Date(2004, 10, 28)
    640 
    641     def CheckTime(self):
    642         t = sqlite.Time(12, 39, 35)
    643 
    644     def CheckTimestamp(self):
    645         ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
    646 
    647     def CheckDateFromTicks(self):
    648         d = sqlite.DateFromTicks(42)
    649 
    650     def CheckTimeFromTicks(self):
    651         t = sqlite.TimeFromTicks(42)
    652 
    653     def CheckTimestampFromTicks(self):
    654         ts = sqlite.TimestampFromTicks(42)
    655 
    656     def CheckBinary(self):
    657         with test_support.check_py3k_warnings():
    658             b = sqlite.Binary(chr(0) + "'")
    659 
    660 class ExtensionTests(unittest.TestCase):
    661     def CheckScriptStringSql(self):
    662         con = sqlite.connect(":memory:")
    663         cur = con.cursor()
    664         cur.executescript("""
    665             -- bla bla
    666             /* a stupid comment */
    667             create table a(i);
    668             insert into a(i) values (5);
    669             """)
    670         cur.execute("select i from a")
    671         res = cur.fetchone()[0]
    672         self.assertEqual(res, 5)
    673 
    674     def CheckScriptStringUnicode(self):
    675         con = sqlite.connect(":memory:")
    676         cur = con.cursor()
    677         cur.executescript(u"""
    678             create table a(i);
    679             insert into a(i) values (5);
    680             select i from a;
    681             delete from a;
    682             insert into a(i) values (6);
    683             """)
    684         cur.execute("select i from a")
    685         res = cur.fetchone()[0]
    686         self.assertEqual(res, 6)
    687 
    688     def CheckScriptSyntaxError(self):
    689         con = sqlite.connect(":memory:")
    690         cur = con.cursor()
    691         raised = False
    692         try:
    693             cur.executescript("create table test(x); asdf; create table test2(x)")
    694         except sqlite.OperationalError:
    695             raised = True
    696         self.assertEqual(raised, True, "should have raised an exception")
    697 
    698     def CheckScriptErrorNormal(self):
    699         con = sqlite.connect(":memory:")
    700         cur = con.cursor()
    701         raised = False
    702         try:
    703             cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
    704         except sqlite.OperationalError:
    705             raised = True
    706         self.assertEqual(raised, True, "should have raised an exception")
    707 
    708     def CheckConnectionExecute(self):
    709         con = sqlite.connect(":memory:")
    710         result = con.execute("select 5").fetchone()[0]
    711         self.assertEqual(result, 5, "Basic test of Connection.execute")
    712 
    713     def CheckConnectionExecutemany(self):
    714         con = sqlite.connect(":memory:")
    715         con.execute("create table test(foo)")
    716         con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
    717         result = con.execute("select foo from test order by foo").fetchall()
    718         self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
    719         self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
    720 
    721     def CheckConnectionExecutescript(self):
    722         con = sqlite.connect(":memory:")
    723         con.executescript("create table test(foo); insert into test(foo) values (5);")
    724         result = con.execute("select foo from test").fetchone()[0]
    725         self.assertEqual(result, 5, "Basic test of Connection.executescript")
    726 
    727 class ClosedConTests(unittest.TestCase):
    728     def setUp(self):
    729         pass
    730 
    731     def tearDown(self):
    732         pass
    733 
    734     def CheckClosedConCursor(self):
    735         con = sqlite.connect(":memory:")
    736         con.close()
    737         try:
    738             cur = con.cursor()
    739             self.fail("Should have raised a ProgrammingError")
    740         except sqlite.ProgrammingError:
    741             pass
    742         except:
    743             self.fail("Should have raised a ProgrammingError")
    744 
    745     def CheckClosedConCommit(self):
    746         con = sqlite.connect(":memory:")
    747         con.close()
    748         try:
    749             con.commit()
    750             self.fail("Should have raised a ProgrammingError")
    751         except sqlite.ProgrammingError:
    752             pass
    753         except:
    754             self.fail("Should have raised a ProgrammingError")
    755 
    756     def CheckClosedConRollback(self):
    757         con = sqlite.connect(":memory:")
    758         con.close()
    759         try:
    760             con.rollback()
    761             self.fail("Should have raised a ProgrammingError")
    762         except sqlite.ProgrammingError:
    763             pass
    764         except:
    765             self.fail("Should have raised a ProgrammingError")
    766 
    767     def CheckClosedCurExecute(self):
    768         con = sqlite.connect(":memory:")
    769         cur = con.cursor()
    770         con.close()
    771         try:
    772             cur.execute("select 4")
    773             self.fail("Should have raised a ProgrammingError")
    774         except sqlite.ProgrammingError:
    775             pass
    776         except:
    777             self.fail("Should have raised a ProgrammingError")
    778 
    779     def CheckClosedCreateFunction(self):
    780         con = sqlite.connect(":memory:")
    781         con.close()
    782         def f(x): return 17
    783         try:
    784             con.create_function("foo", 1, f)
    785             self.fail("Should have raised a ProgrammingError")
    786         except sqlite.ProgrammingError:
    787             pass
    788         except:
    789             self.fail("Should have raised a ProgrammingError")
    790 
    791     def CheckClosedCreateAggregate(self):
    792         con = sqlite.connect(":memory:")
    793         con.close()
    794         class Agg:
    795             def __init__(self):
    796                 pass
    797             def step(self, x):
    798                 pass
    799             def finalize(self):
    800                 return 17
    801         try:
    802             con.create_aggregate("foo", 1, Agg)
    803             self.fail("Should have raised a ProgrammingError")
    804         except sqlite.ProgrammingError:
    805             pass
    806         except:
    807             self.fail("Should have raised a ProgrammingError")
    808 
    809     def CheckClosedSetAuthorizer(self):
    810         con = sqlite.connect(":memory:")
    811         con.close()
    812         def authorizer(*args):
    813             return sqlite.DENY
    814         try:
    815             con.set_authorizer(authorizer)
    816             self.fail("Should have raised a ProgrammingError")
    817         except sqlite.ProgrammingError:
    818             pass
    819         except:
    820             self.fail("Should have raised a ProgrammingError")
    821 
    822     def CheckClosedSetProgressCallback(self):
    823         con = sqlite.connect(":memory:")
    824         con.close()
    825         def progress(): pass
    826         try:
    827             con.set_progress_handler(progress, 100)
    828             self.fail("Should have raised a ProgrammingError")
    829         except sqlite.ProgrammingError:
    830             pass
    831         except:
    832             self.fail("Should have raised a ProgrammingError")
    833 
    834     def CheckClosedCall(self):
    835         con = sqlite.connect(":memory:")
    836         con.close()
    837         try:
    838             con()
    839             self.fail("Should have raised a ProgrammingError")
    840         except sqlite.ProgrammingError:
    841             pass
    842         except:
    843             self.fail("Should have raised a ProgrammingError")
    844 
    845 class ClosedCurTests(unittest.TestCase):
    846     def setUp(self):
    847         pass
    848 
    849     def tearDown(self):
    850         pass
    851 
    852     def CheckClosed(self):
    853         con = sqlite.connect(":memory:")
    854         cur = con.cursor()
    855         cur.close()
    856 
    857         for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
    858             if method_name in ("execute", "executescript"):
    859                 params = ("select 4 union select 5",)
    860             elif method_name == "executemany":
    861                 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
    862             else:
    863                 params = []
    864 
    865             try:
    866                 method = getattr(cur, method_name)
    867 
    868                 method(*params)
    869                 self.fail("Should have raised a ProgrammingError: method " + method_name)
    870             except sqlite.ProgrammingError:
    871                 pass
    872             except:
    873                 self.fail("Should have raised a ProgrammingError: " + method_name)
    874 
    875 def suite():
    876     module_suite = unittest.makeSuite(ModuleTests, "Check")
    877     connection_suite = unittest.makeSuite(ConnectionTests, "Check")
    878     cursor_suite = unittest.makeSuite(CursorTests, "Check")
    879     thread_suite = unittest.makeSuite(ThreadTests, "Check")
    880     constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
    881     ext_suite = unittest.makeSuite(ExtensionTests, "Check")
    882     closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
    883     closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
    884     return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
    885 
    886 def test():
    887     runner = unittest.TextTestRunner()
    888     runner.run(suite())
    889 
    890 if __name__ == "__main__":
    891     test()
    892