Home | History | Annotate | Download | only in test
      1 #-*- coding: iso-8859-1 -*-
      2 # pysqlite2/test/regression.py: pysqlite regression tests
      3 #
      4 # Copyright (C) 2006-2007 Gerhard Hring <gh (at] ghaering.de>
      5 #
      6 # This file is part of pysqlite.
      7 #
      8 # This software is provided 'as-is', without any express or implied
      9 # warranty.  In no event will the authors be held liable for any damages
     10 # arising from the use of this software.
     11 #
     12 # Permission is granted to anyone to use this software for any purpose,
     13 # including commercial applications, and to alter it and redistribute it
     14 # freely, subject to the following restrictions:
     15 #
     16 # 1. The origin of this software must not be misrepresented; you must not
     17 #    claim that you wrote the original software. If you use this software
     18 #    in a product, an acknowledgment in the product documentation would be
     19 #    appreciated but is not required.
     20 # 2. Altered source versions must be plainly marked as such, and must not be
     21 #    misrepresented as being the original software.
     22 # 3. This notice may not be removed or altered from any source distribution.
     23 
     24 import datetime
     25 import unittest
     26 import sqlite3 as sqlite
     27 import weakref
     28 from test import support
     29 
     30 class RegressionTests(unittest.TestCase):
     31     def setUp(self):
     32         self.con = sqlite.connect(":memory:")
     33 
     34     def tearDown(self):
     35         self.con.close()
     36 
     37     def CheckPragmaUserVersion(self):
     38         # This used to crash pysqlite because this pragma command returns NULL for the column name
     39         cur = self.con.cursor()
     40         cur.execute("pragma user_version")
     41 
     42     def CheckPragmaSchemaVersion(self):
     43         # This still crashed pysqlite <= 2.2.1
     44         con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
     45         try:
     46             cur = self.con.cursor()
     47             cur.execute("pragma schema_version")
     48         finally:
     49             cur.close()
     50             con.close()
     51 
     52     def CheckStatementReset(self):
     53         # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
     54         # reset before a rollback, but only those that are still in the
     55         # statement cache. The others are not accessible from the connection object.
     56         con = sqlite.connect(":memory:", cached_statements=5)
     57         cursors = [con.cursor() for x in xrange(5)]
     58         cursors[0].execute("create table test(x)")
     59         for i in range(10):
     60             cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)])
     61 
     62         for i in range(5):
     63             cursors[i].execute(" " * i + "select x from test")
     64 
     65         con.rollback()
     66 
     67     def CheckColumnNameWithSpaces(self):
     68         cur = self.con.cursor()
     69         cur.execute('select 1 as "foo bar [datetime]"')
     70         self.assertEqual(cur.description[0][0], "foo bar")
     71 
     72         cur.execute('select 1 as "foo baz"')
     73         self.assertEqual(cur.description[0][0], "foo baz")
     74 
     75     def CheckStatementFinalizationOnCloseDb(self):
     76         # pysqlite versions <= 2.3.3 only finalized statements in the statement
     77         # cache when closing the database. statements that were still
     78         # referenced in cursors weren't closed and could provoke "
     79         # "OperationalError: Unable to close due to unfinalised statements".
     80         con = sqlite.connect(":memory:")
     81         cursors = []
     82         # default statement cache size is 100
     83         for i in range(105):
     84             cur = con.cursor()
     85             cursors.append(cur)
     86             cur.execute("select 1 x union select " + str(i))
     87         con.close()
     88 
     89     def CheckOnConflictRollback(self):
     90         if sqlite.sqlite_version_info < (3, 2, 2):
     91             return
     92         con = sqlite.connect(":memory:")
     93         con.execute("create table foo(x, unique(x) on conflict rollback)")
     94         con.execute("insert into foo(x) values (1)")
     95         try:
     96             con.execute("insert into foo(x) values (1)")
     97         except sqlite.DatabaseError:
     98             pass
     99         con.execute("insert into foo(x) values (2)")
    100         try:
    101             con.commit()
    102         except sqlite.OperationalError:
    103             self.fail("pysqlite knew nothing about the implicit ROLLBACK")
    104 
    105     def CheckWorkaroundForBuggySqliteTransferBindings(self):
    106         """
    107         pysqlite would crash with older SQLite versions unless
    108         a workaround is implemented.
    109         """
    110         self.con.execute("create table foo(bar)")
    111         self.con.execute("drop table foo")
    112         self.con.execute("create table foo(bar)")
    113 
    114     def CheckEmptyStatement(self):
    115         """
    116         pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
    117         for "no-operation" statements
    118         """
    119         self.con.execute("")
    120 
    121     def CheckUnicodeConnect(self):
    122         """
    123         With pysqlite 2.4.0 you needed to use a string or an APSW connection
    124         object for opening database connections.
    125 
    126         Formerly, both bytestrings and unicode strings used to work.
    127 
    128         Let's make sure unicode strings work in the future.
    129         """
    130         con = sqlite.connect(u":memory:")
    131         con.close()
    132 
    133     def CheckTypeMapUsage(self):
    134         """
    135         pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
    136         a statement. This test exhibits the problem.
    137         """
    138         SELECT = "select * from foo"
    139         con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
    140         con.execute("create table foo(bar timestamp)")
    141         con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
    142         con.execute(SELECT)
    143         con.execute("drop table foo")
    144         con.execute("create table foo(bar integer)")
    145         con.execute("insert into foo(bar) values (5)")
    146         con.execute(SELECT)
    147 
    148     def CheckRegisterAdapter(self):
    149         """
    150         See issue 3312.
    151         """
    152         self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
    153 
    154     def CheckSetIsolationLevel(self):
    155         """
    156         See issue 3312.
    157         """
    158         con = sqlite.connect(":memory:")
    159         self.assertRaises(UnicodeEncodeError, setattr, con,
    160                           "isolation_level", u"\xe9")
    161 
    162     def CheckCursorConstructorCallCheck(self):
    163         """
    164         Verifies that cursor methods check whether base class __init__ was
    165         called.
    166         """
    167         class Cursor(sqlite.Cursor):
    168             def __init__(self, con):
    169                 pass
    170 
    171         con = sqlite.connect(":memory:")
    172         cur = Cursor(con)
    173         try:
    174             cur.execute("select 4+5").fetchall()
    175             self.fail("should have raised ProgrammingError")
    176         except sqlite.ProgrammingError:
    177             pass
    178         except:
    179             self.fail("should have raised ProgrammingError")
    180         with self.assertRaisesRegexp(sqlite.ProgrammingError,
    181                                      r'^Base Cursor\.__init__ not called\.$'):
    182             cur.close()
    183 
    184     def CheckConnectionConstructorCallCheck(self):
    185         """
    186         Verifies that connection methods check whether base class __init__ was
    187         called.
    188         """
    189         class Connection(sqlite.Connection):
    190             def __init__(self, name):
    191                 pass
    192 
    193         con = Connection(":memory:")
    194         try:
    195             cur = con.cursor()
    196             self.fail("should have raised ProgrammingError")
    197         except sqlite.ProgrammingError:
    198             pass
    199         except:
    200             self.fail("should have raised ProgrammingError")
    201 
    202     def CheckCursorRegistration(self):
    203         """
    204         Verifies that subclassed cursor classes are correctly registered with
    205         the connection object, too.  (fetch-across-rollback problem)
    206         """
    207         class Connection(sqlite.Connection):
    208             def cursor(self):
    209                 return Cursor(self)
    210 
    211         class Cursor(sqlite.Cursor):
    212             def __init__(self, con):
    213                 sqlite.Cursor.__init__(self, con)
    214 
    215         con = Connection(":memory:")
    216         cur = con.cursor()
    217         cur.execute("create table foo(x)")
    218         cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
    219         cur.execute("select x from foo")
    220         con.rollback()
    221         try:
    222             cur.fetchall()
    223             self.fail("should have raised InterfaceError")
    224         except sqlite.InterfaceError:
    225             pass
    226         except:
    227             self.fail("should have raised InterfaceError")
    228 
    229     def CheckAutoCommit(self):
    230         """
    231         Verifies that creating a connection in autocommit mode works.
    232         2.5.3 introduced a regression so that these could no longer
    233         be created.
    234         """
    235         con = sqlite.connect(":memory:", isolation_level=None)
    236 
    237     def CheckPragmaAutocommit(self):
    238         """
    239         Verifies that running a PRAGMA statement that does an autocommit does
    240         work. This did not work in 2.5.3/2.5.4.
    241         """
    242         cur = self.con.cursor()
    243         cur.execute("create table foo(bar)")
    244         cur.execute("insert into foo(bar) values (5)")
    245 
    246         cur.execute("pragma page_size")
    247         row = cur.fetchone()
    248 
    249     def CheckSetDict(self):
    250         """
    251         See http://bugs.python.org/issue7478
    252 
    253         It was possible to successfully register callbacks that could not be
    254         hashed. Return codes of PyDict_SetItem were not checked properly.
    255         """
    256         class NotHashable:
    257             def __call__(self, *args, **kw):
    258                 pass
    259             def __hash__(self):
    260                 raise TypeError()
    261         var = NotHashable()
    262         self.assertRaises(TypeError, self.con.create_function, var)
    263         self.assertRaises(TypeError, self.con.create_aggregate, var)
    264         self.assertRaises(TypeError, self.con.set_authorizer, var)
    265         self.assertRaises(TypeError, self.con.set_progress_handler, var)
    266 
    267     def CheckConnectionCall(self):
    268         """
    269         Call a connection with a non-string SQL request: check error handling
    270         of the statement constructor.
    271         """
    272         self.assertRaises(sqlite.Warning, self.con, 1)
    273 
    274     def CheckRecursiveCursorUse(self):
    275         """
    276         http://bugs.python.org/issue10811
    277 
    278         Recursively using a cursor, such as when reusing it from a generator led to segfaults.
    279         Now we catch recursive cursor usage and raise a ProgrammingError.
    280         """
    281         con = sqlite.connect(":memory:")
    282 
    283         cur = con.cursor()
    284         cur.execute("create table a (bar)")
    285         cur.execute("create table b (baz)")
    286 
    287         def foo():
    288             cur.execute("insert into a (bar) values (?)", (1,))
    289             yield 1
    290 
    291         with self.assertRaises(sqlite.ProgrammingError):
    292             cur.executemany("insert into b (baz) values (?)",
    293                             ((i,) for i in foo()))
    294 
    295     def CheckConvertTimestampMicrosecondPadding(self):
    296         """
    297         http://bugs.python.org/issue14720
    298 
    299         The microsecond parsing of convert_timestamp() should pad with zeros,
    300         since the microsecond string "456" actually represents "456000".
    301         """
    302 
    303         con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
    304         cur = con.cursor()
    305         cur.execute("CREATE TABLE t (x TIMESTAMP)")
    306 
    307         # Microseconds should be 456000
    308         cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
    309 
    310         # Microseconds should be truncated to 123456
    311         cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
    312 
    313         cur.execute("SELECT * FROM t")
    314         values = [x[0] for x in cur.fetchall()]
    315 
    316         self.assertEqual(values, [
    317             datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
    318             datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
    319         ])
    320 
    321     def CheckInvalidIsolationLevelType(self):
    322         # isolation level is a string, not an integer
    323         self.assertRaises(TypeError,
    324                           sqlite.connect, ":memory:", isolation_level=123)
    325 
    326 
    327     def CheckNullCharacter(self):
    328         # Issue #21147
    329         con = sqlite.connect(":memory:")
    330         self.assertRaises(ValueError, con, "\0select 1")
    331         self.assertRaises(ValueError, con, "select 1\0")
    332         cur = con.cursor()
    333         self.assertRaises(ValueError, cur.execute, " \0select 2")
    334         self.assertRaises(ValueError, cur.execute, "select 2\0")
    335 
    336     def CheckCommitCursorReset(self):
    337         """
    338         Connection.commit() did reset cursors, which made sqlite3
    339         to return rows multiple times when fetched from cursors
    340         after commit. See issues 10513 and 23129 for details.
    341         """
    342         con = sqlite.connect(":memory:")
    343         con.executescript("""
    344         create table t(c);
    345         create table t2(c);
    346         insert into t values(0);
    347         insert into t values(1);
    348         insert into t values(2);
    349         """)
    350 
    351         self.assertEqual(con.isolation_level, "")
    352 
    353         counter = 0
    354         for i, row in enumerate(con.execute("select c from t")):
    355             con.execute("insert into t2(c) values (?)", (i,))
    356             con.commit()
    357             if counter == 0:
    358                 self.assertEqual(row[0], 0)
    359             elif counter == 1:
    360                 self.assertEqual(row[0], 1)
    361             elif counter == 2:
    362                 self.assertEqual(row[0], 2)
    363             counter += 1
    364         self.assertEqual(counter, 3, "should have returned exactly three rows")
    365 
    366     def CheckBpo31770(self):
    367         """
    368         The interpreter shouldn't crash in case Cursor.__init__() is called
    369         more than once.
    370         """
    371         def callback(*args):
    372             pass
    373         con = sqlite.connect(":memory:")
    374         cur = sqlite.Cursor(con)
    375         ref = weakref.ref(cur, callback)
    376         cur.__init__(con)
    377         del cur
    378         # The interpreter shouldn't crash when ref is collected.
    379         del ref
    380         support.gc_collect()
    381 
    382 
    383 def suite():
    384     regression_suite = unittest.makeSuite(RegressionTests, "Check")
    385     return unittest.TestSuite((regression_suite,))
    386 
    387 def test():
    388     runner = unittest.TextTestRunner()
    389     runner.run(suite())
    390 
    391 if __name__ == "__main__":
    392     test()
    393