Home | History | Annotate | Download | only in test
      1 #-*- coding: iso-8859-1 -*-
      2 # pysqlite2/test/regression.py: pysqlite regression tests
      3 #
      4 # Copyright (C) 2006-2010 Gerhard Hring <gh (at] ghaering.de>
      5 #
      6 # This file is part of pysqlite.
      7 #
      8 # This software is provided 'as-is', without any express or implied
      9 # warranty.  In no event will the authors be held liable for any damages
     10 # arising from the use of this software.
     11 #
     12 # Permission is granted to anyone to use this software for any purpose,
     13 # including commercial applications, and to alter it and redistribute it
     14 # freely, subject to the following restrictions:
     15 #
     16 # 1. The origin of this software must not be misrepresented; you must not
     17 #    claim that you wrote the original software. If you use this software
     18 #    in a product, an acknowledgment in the product documentation would be
     19 #    appreciated but is not required.
     20 # 2. Altered source versions must be plainly marked as such, and must not be
     21 #    misrepresented as being the original software.
     22 # 3. This notice may not be removed or altered from any source distribution.
     23 
     24 import datetime
     25 import unittest
     26 import sqlite3 as sqlite
     27 
     28 class RegressionTests(unittest.TestCase):
     29     def setUp(self):
     30         self.con = sqlite.connect(":memory:")
     31 
     32     def tearDown(self):
     33         self.con.close()
     34 
     35     def CheckPragmaUserVersion(self):
     36         # This used to crash pysqlite because this pragma command returns NULL for the column name
     37         cur = self.con.cursor()
     38         cur.execute("pragma user_version")
     39 
     40     def CheckPragmaSchemaVersion(self):
     41         # This still crashed pysqlite <= 2.2.1
     42         con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
     43         try:
     44             cur = self.con.cursor()
     45             cur.execute("pragma schema_version")
     46         finally:
     47             cur.close()
     48             con.close()
     49 
     50     def CheckStatementReset(self):
     51         # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
     52         # reset before a rollback, but only those that are still in the
     53         # statement cache. The others are not accessible from the connection object.
     54         con = sqlite.connect(":memory:", cached_statements=5)
     55         cursors = [con.cursor() for x in range(5)]
     56         cursors[0].execute("create table test(x)")
     57         for i in range(10):
     58             cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
     59 
     60         for i in range(5):
     61             cursors[i].execute(" " * i + "select x from test")
     62 
     63         con.rollback()
     64 
     65     def CheckColumnNameWithSpaces(self):
     66         cur = self.con.cursor()
     67         cur.execute('select 1 as "foo bar [datetime]"')
     68         self.assertEqual(cur.description[0][0], "foo bar")
     69 
     70         cur.execute('select 1 as "foo baz"')
     71         self.assertEqual(cur.description[0][0], "foo baz")
     72 
     73     def CheckStatementFinalizationOnCloseDb(self):
     74         # pysqlite versions <= 2.3.3 only finalized statements in the statement
     75         # cache when closing the database. statements that were still
     76         # referenced in cursors weren't closed and could provoke "
     77         # "OperationalError: Unable to close due to unfinalised statements".
     78         con = sqlite.connect(":memory:")
     79         cursors = []
     80         # default statement cache size is 100
     81         for i in range(105):
     82             cur = con.cursor()
     83             cursors.append(cur)
     84             cur.execute("select 1 x union select " + str(i))
     85         con.close()
     86 
     87     @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
     88     def CheckOnConflictRollback(self):
     89         con = sqlite.connect(":memory:")
     90         con.execute("create table foo(x, unique(x) on conflict rollback)")
     91         con.execute("insert into foo(x) values (1)")
     92         try:
     93             con.execute("insert into foo(x) values (1)")
     94         except sqlite.DatabaseError:
     95             pass
     96         con.execute("insert into foo(x) values (2)")
     97         try:
     98             con.commit()
     99         except sqlite.OperationalError:
    100             self.fail("pysqlite knew nothing about the implicit ROLLBACK")
    101 
    102     def CheckWorkaroundForBuggySqliteTransferBindings(self):
    103         """
    104         pysqlite would crash with older SQLite versions unless
    105         a workaround is implemented.
    106         """
    107         self.con.execute("create table foo(bar)")
    108         self.con.execute("drop table foo")
    109         self.con.execute("create table foo(bar)")
    110 
    111     def CheckEmptyStatement(self):
    112         """
    113         pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
    114         for "no-operation" statements
    115         """
    116         self.con.execute("")
    117 
    118     def CheckTypeMapUsage(self):
    119         """
    120         pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
    121         a statement. This test exhibits the problem.
    122         """
    123         SELECT = "select * from foo"
    124         con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
    125         con.execute("create table foo(bar timestamp)")
    126         con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
    127         con.execute(SELECT)
    128         con.execute("drop table foo")
    129         con.execute("create table foo(bar integer)")
    130         con.execute("insert into foo(bar) values (5)")
    131         con.execute(SELECT)
    132 
    133     def CheckErrorMsgDecodeError(self):
    134         # When porting the module to Python 3.0, the error message about
    135         # decoding errors disappeared. This verifies they're back again.
    136         with self.assertRaises(sqlite.OperationalError) as cm:
    137             self.con.execute("select 'xxx' || ? || 'yyy' colname",
    138                              (bytes(bytearray([250])),)).fetchone()
    139         msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
    140         self.assertIn(msg, str(cm.exception))
    141 
    142     def CheckRegisterAdapter(self):
    143         """
    144         See issue 3312.
    145         """
    146         self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
    147 
    148     def CheckSetIsolationLevel(self):
    149         # See issue 27881.
    150         class CustomStr(str):
    151             def upper(self):
    152                 return None
    153             def __del__(self):
    154                 con.isolation_level = ""
    155 
    156         con = sqlite.connect(":memory:")
    157         con.isolation_level = None
    158         for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
    159             with self.subTest(level=level):
    160                 con.isolation_level = level
    161                 con.isolation_level = level.lower()
    162                 con.isolation_level = level.capitalize()
    163                 con.isolation_level = CustomStr(level)
    164 
    165         # setting isolation_level failure should not alter previous state
    166         con.isolation_level = None
    167         con.isolation_level = "DEFERRED"
    168         pairs = [
    169             (1, TypeError), (b'', TypeError), ("abc", ValueError),
    170             ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
    171         ]
    172         for value, exc in pairs:
    173             with self.subTest(level=value):
    174                 with self.assertRaises(exc):
    175                     con.isolation_level = value
    176                 self.assertEqual(con.isolation_level, "DEFERRED")
    177 
    178     def CheckCursorConstructorCallCheck(self):
    179         """
    180         Verifies that cursor methods check whether base class __init__ was
    181         called.
    182         """
    183         class Cursor(sqlite.Cursor):
    184             def __init__(self, con):
    185                 pass
    186 
    187         con = sqlite.connect(":memory:")
    188         cur = Cursor(con)
    189         with self.assertRaises(sqlite.ProgrammingError):
    190             cur.execute("select 4+5").fetchall()
    191 
    192     def CheckStrSubclass(self):
    193         """
    194         The Python 3.0 port of the module didn't cope with values of subclasses of str.
    195         """
    196         class MyStr(str): pass
    197         self.con.execute("select ?", (MyStr("abc"),))
    198 
    199     def CheckConnectionConstructorCallCheck(self):
    200         """
    201         Verifies that connection methods check whether base class __init__ was
    202         called.
    203         """
    204         class Connection(sqlite.Connection):
    205             def __init__(self, name):
    206                 pass
    207 
    208         con = Connection(":memory:")
    209         with self.assertRaises(sqlite.ProgrammingError):
    210             cur = con.cursor()
    211 
    212     def CheckCursorRegistration(self):
    213         """
    214         Verifies that subclassed cursor classes are correctly registered with
    215         the connection object, too.  (fetch-across-rollback problem)
    216         """
    217         class Connection(sqlite.Connection):
    218             def cursor(self):
    219                 return Cursor(self)
    220 
    221         class Cursor(sqlite.Cursor):
    222             def __init__(self, con):
    223                 sqlite.Cursor.__init__(self, con)
    224 
    225         con = Connection(":memory:")
    226         cur = con.cursor()
    227         cur.execute("create table foo(x)")
    228         cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
    229         cur.execute("select x from foo")
    230         con.rollback()
    231         with self.assertRaises(sqlite.InterfaceError):
    232             cur.fetchall()
    233 
    234     def CheckAutoCommit(self):
    235         """
    236         Verifies that creating a connection in autocommit mode works.
    237         2.5.3 introduced a regression so that these could no longer
    238         be created.
    239         """
    240         con = sqlite.connect(":memory:", isolation_level=None)
    241 
    242     def CheckPragmaAutocommit(self):
    243         """
    244         Verifies that running a PRAGMA statement that does an autocommit does
    245         work. This did not work in 2.5.3/2.5.4.
    246         """
    247         cur = self.con.cursor()
    248         cur.execute("create table foo(bar)")
    249         cur.execute("insert into foo(bar) values (5)")
    250 
    251         cur.execute("pragma page_size")
    252         row = cur.fetchone()
    253 
    254     def CheckSetDict(self):
    255         """
    256         See http://bugs.python.org/issue7478
    257 
    258         It was possible to successfully register callbacks that could not be
    259         hashed. Return codes of PyDict_SetItem were not checked properly.
    260         """
    261         class NotHashable:
    262             def __call__(self, *args, **kw):
    263                 pass
    264             def __hash__(self):
    265                 raise TypeError()
    266         var = NotHashable()
    267         self.assertRaises(TypeError, self.con.create_function, var)
    268         self.assertRaises(TypeError, self.con.create_aggregate, var)
    269         self.assertRaises(TypeError, self.con.set_authorizer, var)
    270         self.assertRaises(TypeError, self.con.set_progress_handler, var)
    271 
    272     def CheckConnectionCall(self):
    273         """
    274         Call a connection with a non-string SQL request: check error handling
    275         of the statement constructor.
    276         """
    277         self.assertRaises(sqlite.Warning, self.con, 1)
    278 
    279     def CheckCollation(self):
    280         def collation_cb(a, b):
    281             return 1
    282         self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
    283             # Lone surrogate cannot be encoded to the default encoding (utf8)
    284             "\uDC80", collation_cb)
    285 
    286     def CheckRecursiveCursorUse(self):
    287         """
    288         http://bugs.python.org/issue10811
    289 
    290         Recursively using a cursor, such as when reusing it from a generator led to segfaults.
    291         Now we catch recursive cursor usage and raise a ProgrammingError.
    292         """
    293         con = sqlite.connect(":memory:")
    294 
    295         cur = con.cursor()
    296         cur.execute("create table a (bar)")
    297         cur.execute("create table b (baz)")
    298 
    299         def foo():
    300             cur.execute("insert into a (bar) values (?)", (1,))
    301             yield 1
    302 
    303         with self.assertRaises(sqlite.ProgrammingError):
    304             cur.executemany("insert into b (baz) values (?)",
    305                             ((i,) for i in foo()))
    306 
    307     def CheckConvertTimestampMicrosecondPadding(self):
    308         """
    309         http://bugs.python.org/issue14720
    310 
    311         The microsecond parsing of convert_timestamp() should pad with zeros,
    312         since the microsecond string "456" actually represents "456000".
    313         """
    314 
    315         con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
    316         cur = con.cursor()
    317         cur.execute("CREATE TABLE t (x TIMESTAMP)")
    318 
    319         # Microseconds should be 456000
    320         cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
    321 
    322         # Microseconds should be truncated to 123456
    323         cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
    324 
    325         cur.execute("SELECT * FROM t")
    326         values = [x[0] for x in cur.fetchall()]
    327 
    328         self.assertEqual(values, [
    329             datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
    330             datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
    331         ])
    332 
    333     def CheckInvalidIsolationLevelType(self):
    334         # isolation level is a string, not an integer
    335         self.assertRaises(TypeError,
    336                           sqlite.connect, ":memory:", isolation_level=123)
    337 
    338 
    339     def CheckNullCharacter(self):
    340         # Issue #21147
    341         con = sqlite.connect(":memory:")
    342         self.assertRaises(ValueError, con, "\0select 1")
    343         self.assertRaises(ValueError, con, "select 1\0")
    344         cur = con.cursor()
    345         self.assertRaises(ValueError, cur.execute, " \0select 2")
    346         self.assertRaises(ValueError, cur.execute, "select 2\0")
    347 
    348     def CheckCommitCursorReset(self):
    349         """
    350         Connection.commit() did reset cursors, which made sqlite3
    351         to return rows multiple times when fetched from cursors
    352         after commit. See issues 10513 and 23129 for details.
    353         """
    354         con = sqlite.connect(":memory:")
    355         con.executescript("""
    356         create table t(c);
    357         create table t2(c);
    358         insert into t values(0);
    359         insert into t values(1);
    360         insert into t values(2);
    361         """)
    362 
    363         self.assertEqual(con.isolation_level, "")
    364 
    365         counter = 0
    366         for i, row in enumerate(con.execute("select c from t")):
    367             with self.subTest(i=i, row=row):
    368                 con.execute("insert into t2(c) values (?)", (i,))
    369                 con.commit()
    370                 if counter == 0:
    371                     self.assertEqual(row[0], 0)
    372                 elif counter == 1:
    373                     self.assertEqual(row[0], 1)
    374                 elif counter == 2:
    375                     self.assertEqual(row[0], 2)
    376                 counter += 1
    377         self.assertEqual(counter, 3, "should have returned exactly three rows")
    378 
    379 
    380 def suite():
    381     regression_suite = unittest.makeSuite(RegressionTests, "Check")
    382     return unittest.TestSuite((regression_suite,))
    383 
    384 def test():
    385     runner = unittest.TextTestRunner()
    386     runner.run(suite())
    387 
    388 if __name__ == "__main__":
    389     test()
    390