Home | History | Annotate | Download | only in test
      1 #-*- coding: ISO-8859-1 -*-
      2 # pysqlite2/test/dbapi.py: tests for DB-API compliance
      3 #
      4 # Copyright (C) 2004-2010 Gerhard Hring <gh (at] ghaering.de>
      5 #
      6 # This file is part of pysqlite.
      7 #
      8 # This software is provided 'as-is', without any express or implied
      9 # warranty.  In no event will the authors be held liable for any damages
     10 # arising from the use of this software.
     11 #
     12 # Permission is granted to anyone to use this software for any purpose,
     13 # including commercial applications, and to alter it and redistribute it
     14 # freely, subject to the following restrictions:
     15 #
     16 # 1. The origin of this software must not be misrepresented; you must not
     17 #    claim that you wrote the original software. If you use this software
     18 #    in a product, an acknowledgment in the product documentation would be
     19 #    appreciated but is not required.
     20 # 2. Altered source versions must be plainly marked as such, and must not be
     21 #    misrepresented as being the original software.
     22 # 3. This notice may not be removed or altered from any source distribution.
     23 
     24 import unittest
     25 import sys
     26 import sqlite3 as sqlite
     27 try:
     28     import threading
     29 except ImportError:
     30     threading = None
     31 
     32 class ModuleTests(unittest.TestCase):
     33     def CheckAPILevel(self):
     34         self.assertEqual(sqlite.apilevel, "2.0",
     35                          "apilevel is %s, should be 2.0" % sqlite.apilevel)
     36 
     37     def CheckThreadSafety(self):
     38         self.assertEqual(sqlite.threadsafety, 1,
     39                          "threadsafety is %d, should be 1" % sqlite.threadsafety)
     40 
     41     def CheckParamStyle(self):
     42         self.assertEqual(sqlite.paramstyle, "qmark",
     43                          "paramstyle is '%s', should be 'qmark'" %
     44                          sqlite.paramstyle)
     45 
     46     def CheckWarning(self):
     47         self.assertTrue(issubclass(sqlite.Warning, StandardError),
     48                         "Warning is not a subclass of StandardError")
     49 
     50     def CheckError(self):
     51         self.assertTrue(issubclass(sqlite.Error, StandardError),
     52                         "Error is not a subclass of StandardError")
     53 
     54     def CheckInterfaceError(self):
     55         self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
     56                         "InterfaceError is not a subclass of Error")
     57 
     58     def CheckDatabaseError(self):
     59         self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
     60                         "DatabaseError is not a subclass of Error")
     61 
     62     def CheckDataError(self):
     63         self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
     64                         "DataError is not a subclass of DatabaseError")
     65 
     66     def CheckOperationalError(self):
     67         self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
     68                         "OperationalError is not a subclass of DatabaseError")
     69 
     70     def CheckIntegrityError(self):
     71         self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
     72                         "IntegrityError is not a subclass of DatabaseError")
     73 
     74     def CheckInternalError(self):
     75         self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
     76                         "InternalError is not a subclass of DatabaseError")
     77 
     78     def CheckProgrammingError(self):
     79         self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
     80                         "ProgrammingError is not a subclass of DatabaseError")
     81 
     82     def CheckNotSupportedError(self):
     83         self.assertTrue(issubclass(sqlite.NotSupportedError,
     84                                    sqlite.DatabaseError),
     85                         "NotSupportedError is not a subclass of DatabaseError")
     86 
     87 class ConnectionTests(unittest.TestCase):
     88     def setUp(self):
     89         self.cx = sqlite.connect(":memory:")
     90         cu = self.cx.cursor()
     91         cu.execute("create table test(id integer primary key, name text)")
     92         cu.execute("insert into test(name) values (?)", ("foo",))
     93 
     94     def tearDown(self):
     95         self.cx.close()
     96 
     97     def CheckCommit(self):
     98         self.cx.commit()
     99 
    100     def CheckCommitAfterNoChanges(self):
    101         """
    102         A commit should also work when no changes were made to the database.
    103         """
    104         self.cx.commit()
    105         self.cx.commit()
    106 
    107     def CheckRollback(self):
    108         self.cx.rollback()
    109 
    110     def CheckRollbackAfterNoChanges(self):
    111         """
    112         A rollback should also work when no changes were made to the database.
    113         """
    114         self.cx.rollback()
    115         self.cx.rollback()
    116 
    117     def CheckCursor(self):
    118         cu = self.cx.cursor()
    119 
    120     def CheckFailedOpen(self):
    121         YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
    122         try:
    123             con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
    124         except sqlite.OperationalError:
    125             return
    126         self.fail("should have raised an OperationalError")
    127 
    128     def CheckClose(self):
    129         self.cx.close()
    130 
    131     def CheckExceptions(self):
    132         # Optional DB-API extension.
    133         self.assertEqual(self.cx.Warning, sqlite.Warning)
    134         self.assertEqual(self.cx.Error, sqlite.Error)
    135         self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
    136         self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
    137         self.assertEqual(self.cx.DataError, sqlite.DataError)
    138         self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
    139         self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
    140         self.assertEqual(self.cx.InternalError, sqlite.InternalError)
    141         self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
    142         self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
    143 
    144 class CursorTests(unittest.TestCase):
    145     def setUp(self):
    146         self.cx = sqlite.connect(":memory:")
    147         self.cu = self.cx.cursor()
    148         self.cu.execute("create table test(id integer primary key, name text, income number)")
    149         self.cu.execute("insert into test(name) values (?)", ("foo",))
    150 
    151     def tearDown(self):
    152         self.cu.close()
    153         self.cx.close()
    154 
    155     def CheckExecuteNoArgs(self):
    156         self.cu.execute("delete from test")
    157 
    158     def CheckExecuteIllegalSql(self):
    159         try:
    160             self.cu.execute("select asdf")
    161             self.fail("should have raised an OperationalError")
    162         except sqlite.OperationalError:
    163             return
    164         except:
    165             self.fail("raised wrong exception")
    166 
    167     def CheckExecuteTooMuchSql(self):
    168         try:
    169             self.cu.execute("select 5+4; select 4+5")
    170             self.fail("should have raised a Warning")
    171         except sqlite.Warning:
    172             return
    173         except:
    174             self.fail("raised wrong exception")
    175 
    176     def CheckExecuteTooMuchSql2(self):
    177         self.cu.execute("select 5+4; -- foo bar")
    178 
    179     def CheckExecuteTooMuchSql3(self):
    180         self.cu.execute("""
    181             select 5+4;
    182 
    183             /*
    184             foo
    185             */
    186             """)
    187 
    188     def CheckExecuteWrongSqlArg(self):
    189         try:
    190             self.cu.execute(42)
    191             self.fail("should have raised a ValueError")
    192         except ValueError:
    193             return
    194         except:
    195             self.fail("raised wrong exception.")
    196 
    197     def CheckExecuteArgInt(self):
    198         self.cu.execute("insert into test(id) values (?)", (42,))
    199 
    200     def CheckExecuteArgFloat(self):
    201         self.cu.execute("insert into test(income) values (?)", (2500.32,))
    202 
    203     def CheckExecuteArgString(self):
    204         self.cu.execute("insert into test(name) values (?)", ("Hugo",))
    205 
    206     def CheckExecuteArgStringWithZeroByte(self):
    207         self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
    208 
    209         self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
    210         row = self.cu.fetchone()
    211         self.assertEqual(row[0], "Hu\x00go")
    212 
    213     def CheckExecuteWrongNoOfArgs1(self):
    214         # too many parameters
    215         try:
    216             self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
    217             self.fail("should have raised ProgrammingError")
    218         except sqlite.ProgrammingError:
    219             pass
    220 
    221     def CheckExecuteWrongNoOfArgs2(self):
    222         # too little parameters
    223         try:
    224             self.cu.execute("insert into test(id) values (?)")
    225             self.fail("should have raised ProgrammingError")
    226         except sqlite.ProgrammingError:
    227             pass
    228 
    229     def CheckExecuteWrongNoOfArgs3(self):
    230         # no parameters, parameters are needed
    231         try:
    232             self.cu.execute("insert into test(id) values (?)")
    233             self.fail("should have raised ProgrammingError")
    234         except sqlite.ProgrammingError:
    235             pass
    236 
    237     def CheckExecuteParamList(self):
    238         self.cu.execute("insert into test(name) values ('foo')")
    239         self.cu.execute("select name from test where name=?", ["foo"])
    240         row = self.cu.fetchone()
    241         self.assertEqual(row[0], "foo")
    242 
    243     def CheckExecuteParamSequence(self):
    244         class L(object):
    245             def __len__(self):
    246                 return 1
    247             def __getitem__(self, x):
    248                 assert x == 0
    249                 return "foo"
    250 
    251         self.cu.execute("insert into test(name) values ('foo')")
    252         self.cu.execute("select name from test where name=?", L())
    253         row = self.cu.fetchone()
    254         self.assertEqual(row[0], "foo")
    255 
    256     def CheckExecuteDictMapping(self):
    257         self.cu.execute("insert into test(name) values ('foo')")
    258         self.cu.execute("select name from test where name=:name", {"name": "foo"})
    259         row = self.cu.fetchone()
    260         self.assertEqual(row[0], "foo")
    261 
    262     def CheckExecuteDictMapping_Mapping(self):
    263         # Test only works with Python 2.5 or later
    264         if sys.version_info < (2, 5, 0):
    265             return
    266 
    267         class D(dict):
    268             def __missing__(self, key):
    269                 return "foo"
    270 
    271         self.cu.execute("insert into test(name) values ('foo')")
    272         self.cu.execute("select name from test where name=:name", D())
    273         row = self.cu.fetchone()
    274         self.assertEqual(row[0], "foo")
    275 
    276     def CheckExecuteDictMappingTooLittleArgs(self):
    277         self.cu.execute("insert into test(name) values ('foo')")
    278         try:
    279             self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
    280             self.fail("should have raised ProgrammingError")
    281         except sqlite.ProgrammingError:
    282             pass
    283 
    284     def CheckExecuteDictMappingNoArgs(self):
    285         self.cu.execute("insert into test(name) values ('foo')")
    286         try:
    287             self.cu.execute("select name from test where name=:name")
    288             self.fail("should have raised ProgrammingError")
    289         except sqlite.ProgrammingError:
    290             pass
    291 
    292     def CheckExecuteDictMappingUnnamed(self):
    293         self.cu.execute("insert into test(name) values ('foo')")
    294         try:
    295             self.cu.execute("select name from test where name=?", {"name": "foo"})
    296             self.fail("should have raised ProgrammingError")
    297         except sqlite.ProgrammingError:
    298             pass
    299 
    300     def CheckClose(self):
    301         self.cu.close()
    302 
    303     def CheckRowcountExecute(self):
    304         self.cu.execute("delete from test")
    305         self.cu.execute("insert into test(name) values ('foo')")
    306         self.cu.execute("insert into test(name) values ('foo')")
    307         self.cu.execute("update test set name='bar'")
    308         self.assertEqual(self.cu.rowcount, 2)
    309 
    310     def CheckRowcountSelect(self):
    311         """
    312         pysqlite does not know the rowcount of SELECT statements, because we
    313         don't fetch all rows after executing the select statement. The rowcount
    314         has thus to be -1.
    315         """
    316         self.cu.execute("select 5 union select 6")
    317         self.assertEqual(self.cu.rowcount, -1)
    318 
    319     def CheckRowcountExecutemany(self):
    320         self.cu.execute("delete from test")
    321         self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
    322         self.assertEqual(self.cu.rowcount, 3)
    323 
    324     def CheckTotalChanges(self):
    325         self.cu.execute("insert into test(name) values ('foo')")
    326         self.cu.execute("insert into test(name) values ('foo')")
    327         if self.cx.total_changes < 2:
    328             self.fail("total changes reported wrong value")
    329 
    330     # Checks for executemany:
    331     # Sequences are required by the DB-API, iterators
    332     # enhancements in pysqlite.
    333 
    334     def CheckExecuteManySequence(self):
    335         self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
    336 
    337     def CheckExecuteManyIterator(self):
    338         class MyIter:
    339             def __init__(self):
    340                 self.value = 5
    341 
    342             def next(self):
    343                 if self.value == 10:
    344                     raise StopIteration
    345                 else:
    346                     self.value += 1
    347                     return (self.value,)
    348 
    349         self.cu.executemany("insert into test(income) values (?)", MyIter())
    350 
    351     def CheckExecuteManyGenerator(self):
    352         def mygen():
    353             for i in range(5):
    354                 yield (i,)
    355 
    356         self.cu.executemany("insert into test(income) values (?)", mygen())
    357 
    358     def CheckExecuteManyWrongSqlArg(self):
    359         try:
    360             self.cu.executemany(42, [(3,)])
    361             self.fail("should have raised a ValueError")
    362         except ValueError:
    363             return
    364         except:
    365             self.fail("raised wrong exception.")
    366 
    367     def CheckExecuteManySelect(self):
    368         try:
    369             self.cu.executemany("select ?", [(3,)])
    370             self.fail("should have raised a ProgrammingError")
    371         except sqlite.ProgrammingError:
    372             return
    373         except:
    374             self.fail("raised wrong exception.")
    375 
    376     def CheckExecuteManyNotIterable(self):
    377         try:
    378             self.cu.executemany("insert into test(income) values (?)", 42)
    379             self.fail("should have raised a TypeError")
    380         except TypeError:
    381             return
    382         except Exception, e:
    383             print "raised", e.__class__
    384             self.fail("raised wrong exception.")
    385 
    386     def CheckFetchIter(self):
    387         # Optional DB-API extension.
    388         self.cu.execute("delete from test")
    389         self.cu.execute("insert into test(id) values (?)", (5,))
    390         self.cu.execute("insert into test(id) values (?)", (6,))
    391         self.cu.execute("select id from test order by id")
    392         lst = []
    393         for row in self.cu:
    394             lst.append(row[0])
    395         self.assertEqual(lst[0], 5)
    396         self.assertEqual(lst[1], 6)
    397 
    398     def CheckFetchone(self):
    399         self.cu.execute("select name from test")
    400         row = self.cu.fetchone()
    401         self.assertEqual(row[0], "foo")
    402         row = self.cu.fetchone()
    403         self.assertEqual(row, None)
    404 
    405     def CheckFetchoneNoStatement(self):
    406         cur = self.cx.cursor()
    407         row = cur.fetchone()
    408         self.assertEqual(row, None)
    409 
    410     def CheckArraySize(self):
    411         # must default ot 1
    412         self.assertEqual(self.cu.arraysize, 1)
    413 
    414         # now set to 2
    415         self.cu.arraysize = 2
    416 
    417         # now make the query return 3 rows
    418         self.cu.execute("delete from test")
    419         self.cu.execute("insert into test(name) values ('A')")
    420         self.cu.execute("insert into test(name) values ('B')")
    421         self.cu.execute("insert into test(name) values ('C')")
    422         self.cu.execute("select name from test")
    423         res = self.cu.fetchmany()
    424 
    425         self.assertEqual(len(res), 2)
    426 
    427     def CheckFetchmany(self):
    428         self.cu.execute("select name from test")
    429         res = self.cu.fetchmany(100)
    430         self.assertEqual(len(res), 1)
    431         res = self.cu.fetchmany(100)
    432         self.assertEqual(res, [])
    433 
    434     def CheckFetchmanyKwArg(self):
    435         """Checks if fetchmany works with keyword arguments"""
    436         self.cu.execute("select name from test")
    437         res = self.cu.fetchmany(size=100)
    438         self.assertEqual(len(res), 1)
    439 
    440     def CheckFetchall(self):
    441         self.cu.execute("select name from test")
    442         res = self.cu.fetchall()
    443         self.assertEqual(len(res), 1)
    444         res = self.cu.fetchall()
    445         self.assertEqual(res, [])
    446 
    447     def CheckSetinputsizes(self):
    448         self.cu.setinputsizes([3, 4, 5])
    449 
    450     def CheckSetoutputsize(self):
    451         self.cu.setoutputsize(5, 0)
    452 
    453     def CheckSetoutputsizeNoColumn(self):
    454         self.cu.setoutputsize(42)
    455 
    456     def CheckCursorConnection(self):
    457         # Optional DB-API extension.
    458         self.assertEqual(self.cu.connection, self.cx)
    459 
    460     def CheckWrongCursorCallable(self):
    461         try:
    462             def f(): pass
    463             cur = self.cx.cursor(f)
    464             self.fail("should have raised a TypeError")
    465         except TypeError:
    466             return
    467         self.fail("should have raised a ValueError")
    468 
    469     def CheckCursorWrongClass(self):
    470         class Foo: pass
    471         foo = Foo()
    472         try:
    473             cur = sqlite.Cursor(foo)
    474             self.fail("should have raised a ValueError")
    475         except TypeError:
    476             pass
    477 
    478 @unittest.skipUnless(threading, 'This test requires threading.')
    479 class ThreadTests(unittest.TestCase):
    480     def setUp(self):
    481         self.con = sqlite.connect(":memory:")
    482         self.cur = self.con.cursor()
    483         self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
    484 
    485     def tearDown(self):
    486         self.cur.close()
    487         self.con.close()
    488 
    489     def CheckConCursor(self):
    490         def run(con, errors):
    491             try:
    492                 cur = con.cursor()
    493                 errors.append("did not raise ProgrammingError")
    494                 return
    495             except sqlite.ProgrammingError:
    496                 return
    497             except:
    498                 errors.append("raised wrong exception")
    499 
    500         errors = []
    501         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
    502         t.start()
    503         t.join()
    504         if len(errors) > 0:
    505             self.fail("\n".join(errors))
    506 
    507     def CheckConCommit(self):
    508         def run(con, errors):
    509             try:
    510                 con.commit()
    511                 errors.append("did not raise ProgrammingError")
    512                 return
    513             except sqlite.ProgrammingError:
    514                 return
    515             except:
    516                 errors.append("raised wrong exception")
    517 
    518         errors = []
    519         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
    520         t.start()
    521         t.join()
    522         if len(errors) > 0:
    523             self.fail("\n".join(errors))
    524 
    525     def CheckConRollback(self):
    526         def run(con, errors):
    527             try:
    528                 con.rollback()
    529                 errors.append("did not raise ProgrammingError")
    530                 return
    531             except sqlite.ProgrammingError:
    532                 return
    533             except:
    534                 errors.append("raised wrong exception")
    535 
    536         errors = []
    537         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
    538         t.start()
    539         t.join()
    540         if len(errors) > 0:
    541             self.fail("\n".join(errors))
    542 
    543     def CheckConClose(self):
    544         def run(con, errors):
    545             try:
    546                 con.close()
    547                 errors.append("did not raise ProgrammingError")
    548                 return
    549             except sqlite.ProgrammingError:
    550                 return
    551             except:
    552                 errors.append("raised wrong exception")
    553 
    554         errors = []
    555         t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
    556         t.start()
    557         t.join()
    558         if len(errors) > 0:
    559             self.fail("\n".join(errors))
    560 
    561     def CheckCurImplicitBegin(self):
    562         def run(cur, errors):
    563             try:
    564                 cur.execute("insert into test(name) values ('a')")
    565                 errors.append("did not raise ProgrammingError")
    566                 return
    567             except sqlite.ProgrammingError:
    568                 return
    569             except:
    570                 errors.append("raised wrong exception")
    571 
    572         errors = []
    573         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
    574         t.start()
    575         t.join()
    576         if len(errors) > 0:
    577             self.fail("\n".join(errors))
    578 
    579     def CheckCurClose(self):
    580         def run(cur, errors):
    581             try:
    582                 cur.close()
    583                 errors.append("did not raise ProgrammingError")
    584                 return
    585             except sqlite.ProgrammingError:
    586                 return
    587             except:
    588                 errors.append("raised wrong exception")
    589 
    590         errors = []
    591         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
    592         t.start()
    593         t.join()
    594         if len(errors) > 0:
    595             self.fail("\n".join(errors))
    596 
    597     def CheckCurExecute(self):
    598         def run(cur, errors):
    599             try:
    600                 cur.execute("select name from test")
    601                 errors.append("did not raise ProgrammingError")
    602                 return
    603             except sqlite.ProgrammingError:
    604                 return
    605             except:
    606                 errors.append("raised wrong exception")
    607 
    608         errors = []
    609         self.cur.execute("insert into test(name) values ('a')")
    610         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
    611         t.start()
    612         t.join()
    613         if len(errors) > 0:
    614             self.fail("\n".join(errors))
    615 
    616     def CheckCurIterNext(self):
    617         def run(cur, errors):
    618             try:
    619                 row = cur.fetchone()
    620                 errors.append("did not raise ProgrammingError")
    621                 return
    622             except sqlite.ProgrammingError:
    623                 return
    624             except:
    625                 errors.append("raised wrong exception")
    626 
    627         errors = []
    628         self.cur.execute("insert into test(name) values ('a')")
    629         self.cur.execute("select name from test")
    630         t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
    631         t.start()
    632         t.join()
    633         if len(errors) > 0:
    634             self.fail("\n".join(errors))
    635 
    636 class ConstructorTests(unittest.TestCase):
    637     def CheckDate(self):
    638         d = sqlite.Date(2004, 10, 28)
    639 
    640     def CheckTime(self):
    641         t = sqlite.Time(12, 39, 35)
    642 
    643     def CheckTimestamp(self):
    644         ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
    645 
    646     def CheckDateFromTicks(self):
    647         d = sqlite.DateFromTicks(42)
    648 
    649     def CheckTimeFromTicks(self):
    650         t = sqlite.TimeFromTicks(42)
    651 
    652     def CheckTimestampFromTicks(self):
    653         ts = sqlite.TimestampFromTicks(42)
    654 
    655     def CheckBinary(self):
    656         b = sqlite.Binary(chr(0) + "'")
    657 
    658 class ExtensionTests(unittest.TestCase):
    659     def CheckScriptStringSql(self):
    660         con = sqlite.connect(":memory:")
    661         cur = con.cursor()
    662         cur.executescript("""
    663             -- bla bla
    664             /* a stupid comment */
    665             create table a(i);
    666             insert into a(i) values (5);
    667             """)
    668         cur.execute("select i from a")
    669         res = cur.fetchone()[0]
    670         self.assertEqual(res, 5)
    671 
    672     def CheckScriptStringUnicode(self):
    673         con = sqlite.connect(":memory:")
    674         cur = con.cursor()
    675         cur.executescript(u"""
    676             create table a(i);
    677             insert into a(i) values (5);
    678             select i from a;
    679             delete from a;
    680             insert into a(i) values (6);
    681             """)
    682         cur.execute("select i from a")
    683         res = cur.fetchone()[0]
    684         self.assertEqual(res, 6)
    685 
    686     def CheckScriptSyntaxError(self):
    687         con = sqlite.connect(":memory:")
    688         cur = con.cursor()
    689         raised = False
    690         try:
    691             cur.executescript("create table test(x); asdf; create table test2(x)")
    692         except sqlite.OperationalError:
    693             raised = True
    694         self.assertEqual(raised, True, "should have raised an exception")
    695 
    696     def CheckScriptErrorNormal(self):
    697         con = sqlite.connect(":memory:")
    698         cur = con.cursor()
    699         raised = False
    700         try:
    701             cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
    702         except sqlite.OperationalError:
    703             raised = True
    704         self.assertEqual(raised, True, "should have raised an exception")
    705 
    706     def CheckConnectionExecute(self):
    707         con = sqlite.connect(":memory:")
    708         result = con.execute("select 5").fetchone()[0]
    709         self.assertEqual(result, 5, "Basic test of Connection.execute")
    710 
    711     def CheckConnectionExecutemany(self):
    712         con = sqlite.connect(":memory:")
    713         con.execute("create table test(foo)")
    714         con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
    715         result = con.execute("select foo from test order by foo").fetchall()
    716         self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
    717         self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
    718 
    719     def CheckConnectionExecutescript(self):
    720         con = sqlite.connect(":memory:")
    721         con.executescript("create table test(foo); insert into test(foo) values (5);")
    722         result = con.execute("select foo from test").fetchone()[0]
    723         self.assertEqual(result, 5, "Basic test of Connection.executescript")
    724 
    725 class ClosedConTests(unittest.TestCase):
    726     def setUp(self):
    727         pass
    728 
    729     def tearDown(self):
    730         pass
    731 
    732     def CheckClosedConCursor(self):
    733         con = sqlite.connect(":memory:")
    734         con.close()
    735         try:
    736             cur = con.cursor()
    737             self.fail("Should have raised a ProgrammingError")
    738         except sqlite.ProgrammingError:
    739             pass
    740         except:
    741             self.fail("Should have raised a ProgrammingError")
    742 
    743     def CheckClosedConCommit(self):
    744         con = sqlite.connect(":memory:")
    745         con.close()
    746         try:
    747             con.commit()
    748             self.fail("Should have raised a ProgrammingError")
    749         except sqlite.ProgrammingError:
    750             pass
    751         except:
    752             self.fail("Should have raised a ProgrammingError")
    753 
    754     def CheckClosedConRollback(self):
    755         con = sqlite.connect(":memory:")
    756         con.close()
    757         try:
    758             con.rollback()
    759             self.fail("Should have raised a ProgrammingError")
    760         except sqlite.ProgrammingError:
    761             pass
    762         except:
    763             self.fail("Should have raised a ProgrammingError")
    764 
    765     def CheckClosedCurExecute(self):
    766         con = sqlite.connect(":memory:")
    767         cur = con.cursor()
    768         con.close()
    769         try:
    770             cur.execute("select 4")
    771             self.fail("Should have raised a ProgrammingError")
    772         except sqlite.ProgrammingError:
    773             pass
    774         except:
    775             self.fail("Should have raised a ProgrammingError")
    776 
    777     def CheckClosedCreateFunction(self):
    778         con = sqlite.connect(":memory:")
    779         con.close()
    780         def f(x): return 17
    781         try:
    782             con.create_function("foo", 1, f)
    783             self.fail("Should have raised a ProgrammingError")
    784         except sqlite.ProgrammingError:
    785             pass
    786         except:
    787             self.fail("Should have raised a ProgrammingError")
    788 
    789     def CheckClosedCreateAggregate(self):
    790         con = sqlite.connect(":memory:")
    791         con.close()
    792         class Agg:
    793             def __init__(self):
    794                 pass
    795             def step(self, x):
    796                 pass
    797             def finalize(self):
    798                 return 17
    799         try:
    800             con.create_aggregate("foo", 1, Agg)
    801             self.fail("Should have raised a ProgrammingError")
    802         except sqlite.ProgrammingError:
    803             pass
    804         except:
    805             self.fail("Should have raised a ProgrammingError")
    806 
    807     def CheckClosedSetAuthorizer(self):
    808         con = sqlite.connect(":memory:")
    809         con.close()
    810         def authorizer(*args):
    811             return sqlite.DENY
    812         try:
    813             con.set_authorizer(authorizer)
    814             self.fail("Should have raised a ProgrammingError")
    815         except sqlite.ProgrammingError:
    816             pass
    817         except:
    818             self.fail("Should have raised a ProgrammingError")
    819 
    820     def CheckClosedSetProgressCallback(self):
    821         con = sqlite.connect(":memory:")
    822         con.close()
    823         def progress(): pass
    824         try:
    825             con.set_progress_handler(progress, 100)
    826             self.fail("Should have raised a ProgrammingError")
    827         except sqlite.ProgrammingError:
    828             pass
    829         except:
    830             self.fail("Should have raised a ProgrammingError")
    831 
    832     def CheckClosedCall(self):
    833         con = sqlite.connect(":memory:")
    834         con.close()
    835         try:
    836             con()
    837             self.fail("Should have raised a ProgrammingError")
    838         except sqlite.ProgrammingError:
    839             pass
    840         except:
    841             self.fail("Should have raised a ProgrammingError")
    842 
    843 class ClosedCurTests(unittest.TestCase):
    844     def setUp(self):
    845         pass
    846 
    847     def tearDown(self):
    848         pass
    849 
    850     def CheckClosed(self):
    851         con = sqlite.connect(":memory:")
    852         cur = con.cursor()
    853         cur.close()
    854 
    855         for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
    856             if method_name in ("execute", "executescript"):
    857                 params = ("select 4 union select 5",)
    858             elif method_name == "executemany":
    859                 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
    860             else:
    861                 params = []
    862 
    863             try:
    864                 method = getattr(cur, method_name)
    865 
    866                 method(*params)
    867                 self.fail("Should have raised a ProgrammingError: method " + method_name)
    868             except sqlite.ProgrammingError:
    869                 pass
    870             except:
    871                 self.fail("Should have raised a ProgrammingError: " + method_name)
    872 
    873 def suite():
    874     module_suite = unittest.makeSuite(ModuleTests, "Check")
    875     connection_suite = unittest.makeSuite(ConnectionTests, "Check")
    876     cursor_suite = unittest.makeSuite(CursorTests, "Check")
    877     thread_suite = unittest.makeSuite(ThreadTests, "Check")
    878     constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
    879     ext_suite = unittest.makeSuite(ExtensionTests, "Check")
    880     closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
    881     closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
    882     return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
    883 
    884 def test():
    885     runner = unittest.TextTestRunner()
    886     runner.run(suite())
    887 
    888 if __name__ == "__main__":
    889     test()
    890