1 #-*- coding: iso-8859-1 -*- 2 # pysqlite2/test/regression.py: pysqlite regression tests 3 # 4 # Copyright (C) 2006-2010 Gerhard Hring <gh (at] ghaering.de> 5 # 6 # This file is part of pysqlite. 7 # 8 # This software is provided 'as-is', without any express or implied 9 # warranty. In no event will the authors be held liable for any damages 10 # arising from the use of this software. 11 # 12 # Permission is granted to anyone to use this software for any purpose, 13 # including commercial applications, and to alter it and redistribute it 14 # freely, subject to the following restrictions: 15 # 16 # 1. The origin of this software must not be misrepresented; you must not 17 # claim that you wrote the original software. If you use this software 18 # in a product, an acknowledgment in the product documentation would be 19 # appreciated but is not required. 20 # 2. Altered source versions must be plainly marked as such, and must not be 21 # misrepresented as being the original software. 22 # 3. This notice may not be removed or altered from any source distribution. 23 24 import datetime 25 import unittest 26 import sqlite3 as sqlite 27 28 class RegressionTests(unittest.TestCase): 29 def setUp(self): 30 self.con = sqlite.connect(":memory:") 31 32 def tearDown(self): 33 self.con.close() 34 35 def CheckPragmaUserVersion(self): 36 # This used to crash pysqlite because this pragma command returns NULL for the column name 37 cur = self.con.cursor() 38 cur.execute("pragma user_version") 39 40 def CheckPragmaSchemaVersion(self): 41 # This still crashed pysqlite <= 2.2.1 42 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 43 try: 44 cur = self.con.cursor() 45 cur.execute("pragma schema_version") 46 finally: 47 cur.close() 48 con.close() 49 50 def CheckStatementReset(self): 51 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 52 # reset before a rollback, but only those that are still in the 53 # statement cache. The others are not accessible from the connection object. 54 con = sqlite.connect(":memory:", cached_statements=5) 55 cursors = [con.cursor() for x in range(5)] 56 cursors[0].execute("create table test(x)") 57 for i in range(10): 58 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) 59 60 for i in range(5): 61 cursors[i].execute(" " * i + "select x from test") 62 63 con.rollback() 64 65 def CheckColumnNameWithSpaces(self): 66 cur = self.con.cursor() 67 cur.execute('select 1 as "foo bar [datetime]"') 68 self.assertEqual(cur.description[0][0], "foo bar") 69 70 cur.execute('select 1 as "foo baz"') 71 self.assertEqual(cur.description[0][0], "foo baz") 72 73 def CheckStatementFinalizationOnCloseDb(self): 74 # pysqlite versions <= 2.3.3 only finalized statements in the statement 75 # cache when closing the database. statements that were still 76 # referenced in cursors weren't closed and could provoke " 77 # "OperationalError: Unable to close due to unfinalised statements". 78 con = sqlite.connect(":memory:") 79 cursors = [] 80 # default statement cache size is 100 81 for i in range(105): 82 cur = con.cursor() 83 cursors.append(cur) 84 cur.execute("select 1 x union select " + str(i)) 85 con.close() 86 87 @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer') 88 def CheckOnConflictRollback(self): 89 con = sqlite.connect(":memory:") 90 con.execute("create table foo(x, unique(x) on conflict rollback)") 91 con.execute("insert into foo(x) values (1)") 92 try: 93 con.execute("insert into foo(x) values (1)") 94 except sqlite.DatabaseError: 95 pass 96 con.execute("insert into foo(x) values (2)") 97 try: 98 con.commit() 99 except sqlite.OperationalError: 100 self.fail("pysqlite knew nothing about the implicit ROLLBACK") 101 102 def CheckWorkaroundForBuggySqliteTransferBindings(self): 103 """ 104 pysqlite would crash with older SQLite versions unless 105 a workaround is implemented. 106 """ 107 self.con.execute("create table foo(bar)") 108 self.con.execute("drop table foo") 109 self.con.execute("create table foo(bar)") 110 111 def CheckEmptyStatement(self): 112 """ 113 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 114 for "no-operation" statements 115 """ 116 self.con.execute("") 117 118 def CheckTypeMapUsage(self): 119 """ 120 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 121 a statement. This test exhibits the problem. 122 """ 123 SELECT = "select * from foo" 124 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 125 con.execute("create table foo(bar timestamp)") 126 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 127 con.execute(SELECT) 128 con.execute("drop table foo") 129 con.execute("create table foo(bar integer)") 130 con.execute("insert into foo(bar) values (5)") 131 con.execute(SELECT) 132 133 def CheckErrorMsgDecodeError(self): 134 # When porting the module to Python 3.0, the error message about 135 # decoding errors disappeared. This verifies they're back again. 136 with self.assertRaises(sqlite.OperationalError) as cm: 137 self.con.execute("select 'xxx' || ? || 'yyy' colname", 138 (bytes(bytearray([250])),)).fetchone() 139 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" 140 self.assertIn(msg, str(cm.exception)) 141 142 def CheckRegisterAdapter(self): 143 """ 144 See issue 3312. 145 """ 146 self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 147 148 def CheckSetIsolationLevel(self): 149 # See issue 27881. 150 class CustomStr(str): 151 def upper(self): 152 return None 153 def __del__(self): 154 con.isolation_level = "" 155 156 con = sqlite.connect(":memory:") 157 con.isolation_level = None 158 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": 159 with self.subTest(level=level): 160 con.isolation_level = level 161 con.isolation_level = level.lower() 162 con.isolation_level = level.capitalize() 163 con.isolation_level = CustomStr(level) 164 165 # setting isolation_level failure should not alter previous state 166 con.isolation_level = None 167 con.isolation_level = "DEFERRED" 168 pairs = [ 169 (1, TypeError), (b'', TypeError), ("abc", ValueError), 170 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), 171 ] 172 for value, exc in pairs: 173 with self.subTest(level=value): 174 with self.assertRaises(exc): 175 con.isolation_level = value 176 self.assertEqual(con.isolation_level, "DEFERRED") 177 178 def CheckCursorConstructorCallCheck(self): 179 """ 180 Verifies that cursor methods check whether base class __init__ was 181 called. 182 """ 183 class Cursor(sqlite.Cursor): 184 def __init__(self, con): 185 pass 186 187 con = sqlite.connect(":memory:") 188 cur = Cursor(con) 189 with self.assertRaises(sqlite.ProgrammingError): 190 cur.execute("select 4+5").fetchall() 191 192 def CheckStrSubclass(self): 193 """ 194 The Python 3.0 port of the module didn't cope with values of subclasses of str. 195 """ 196 class MyStr(str): pass 197 self.con.execute("select ?", (MyStr("abc"),)) 198 199 def CheckConnectionConstructorCallCheck(self): 200 """ 201 Verifies that connection methods check whether base class __init__ was 202 called. 203 """ 204 class Connection(sqlite.Connection): 205 def __init__(self, name): 206 pass 207 208 con = Connection(":memory:") 209 with self.assertRaises(sqlite.ProgrammingError): 210 cur = con.cursor() 211 212 def CheckCursorRegistration(self): 213 """ 214 Verifies that subclassed cursor classes are correctly registered with 215 the connection object, too. (fetch-across-rollback problem) 216 """ 217 class Connection(sqlite.Connection): 218 def cursor(self): 219 return Cursor(self) 220 221 class Cursor(sqlite.Cursor): 222 def __init__(self, con): 223 sqlite.Cursor.__init__(self, con) 224 225 con = Connection(":memory:") 226 cur = con.cursor() 227 cur.execute("create table foo(x)") 228 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) 229 cur.execute("select x from foo") 230 con.rollback() 231 with self.assertRaises(sqlite.InterfaceError): 232 cur.fetchall() 233 234 def CheckAutoCommit(self): 235 """ 236 Verifies that creating a connection in autocommit mode works. 237 2.5.3 introduced a regression so that these could no longer 238 be created. 239 """ 240 con = sqlite.connect(":memory:", isolation_level=None) 241 242 def CheckPragmaAutocommit(self): 243 """ 244 Verifies that running a PRAGMA statement that does an autocommit does 245 work. This did not work in 2.5.3/2.5.4. 246 """ 247 cur = self.con.cursor() 248 cur.execute("create table foo(bar)") 249 cur.execute("insert into foo(bar) values (5)") 250 251 cur.execute("pragma page_size") 252 row = cur.fetchone() 253 254 def CheckSetDict(self): 255 """ 256 See http://bugs.python.org/issue7478 257 258 It was possible to successfully register callbacks that could not be 259 hashed. Return codes of PyDict_SetItem were not checked properly. 260 """ 261 class NotHashable: 262 def __call__(self, *args, **kw): 263 pass 264 def __hash__(self): 265 raise TypeError() 266 var = NotHashable() 267 self.assertRaises(TypeError, self.con.create_function, var) 268 self.assertRaises(TypeError, self.con.create_aggregate, var) 269 self.assertRaises(TypeError, self.con.set_authorizer, var) 270 self.assertRaises(TypeError, self.con.set_progress_handler, var) 271 272 def CheckConnectionCall(self): 273 """ 274 Call a connection with a non-string SQL request: check error handling 275 of the statement constructor. 276 """ 277 self.assertRaises(sqlite.Warning, self.con, 1) 278 279 def CheckCollation(self): 280 def collation_cb(a, b): 281 return 1 282 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation, 283 # Lone surrogate cannot be encoded to the default encoding (utf8) 284 "\uDC80", collation_cb) 285 286 def CheckRecursiveCursorUse(self): 287 """ 288 http://bugs.python.org/issue10811 289 290 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 291 Now we catch recursive cursor usage and raise a ProgrammingError. 292 """ 293 con = sqlite.connect(":memory:") 294 295 cur = con.cursor() 296 cur.execute("create table a (bar)") 297 cur.execute("create table b (baz)") 298 299 def foo(): 300 cur.execute("insert into a (bar) values (?)", (1,)) 301 yield 1 302 303 with self.assertRaises(sqlite.ProgrammingError): 304 cur.executemany("insert into b (baz) values (?)", 305 ((i,) for i in foo())) 306 307 def CheckConvertTimestampMicrosecondPadding(self): 308 """ 309 http://bugs.python.org/issue14720 310 311 The microsecond parsing of convert_timestamp() should pad with zeros, 312 since the microsecond string "456" actually represents "456000". 313 """ 314 315 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 316 cur = con.cursor() 317 cur.execute("CREATE TABLE t (x TIMESTAMP)") 318 319 # Microseconds should be 456000 320 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 321 322 # Microseconds should be truncated to 123456 323 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 324 325 cur.execute("SELECT * FROM t") 326 values = [x[0] for x in cur.fetchall()] 327 328 self.assertEqual(values, [ 329 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 330 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 331 ]) 332 333 def CheckInvalidIsolationLevelType(self): 334 # isolation level is a string, not an integer 335 self.assertRaises(TypeError, 336 sqlite.connect, ":memory:", isolation_level=123) 337 338 339 def CheckNullCharacter(self): 340 # Issue #21147 341 con = sqlite.connect(":memory:") 342 self.assertRaises(ValueError, con, "\0select 1") 343 self.assertRaises(ValueError, con, "select 1\0") 344 cur = con.cursor() 345 self.assertRaises(ValueError, cur.execute, " \0select 2") 346 self.assertRaises(ValueError, cur.execute, "select 2\0") 347 348 def CheckCommitCursorReset(self): 349 """ 350 Connection.commit() did reset cursors, which made sqlite3 351 to return rows multiple times when fetched from cursors 352 after commit. See issues 10513 and 23129 for details. 353 """ 354 con = sqlite.connect(":memory:") 355 con.executescript(""" 356 create table t(c); 357 create table t2(c); 358 insert into t values(0); 359 insert into t values(1); 360 insert into t values(2); 361 """) 362 363 self.assertEqual(con.isolation_level, "") 364 365 counter = 0 366 for i, row in enumerate(con.execute("select c from t")): 367 with self.subTest(i=i, row=row): 368 con.execute("insert into t2(c) values (?)", (i,)) 369 con.commit() 370 if counter == 0: 371 self.assertEqual(row[0], 0) 372 elif counter == 1: 373 self.assertEqual(row[0], 1) 374 elif counter == 2: 375 self.assertEqual(row[0], 2) 376 counter += 1 377 self.assertEqual(counter, 3, "should have returned exactly three rows") 378 379 380 def suite(): 381 regression_suite = unittest.makeSuite(RegressionTests, "Check") 382 return unittest.TestSuite((regression_suite,)) 383 384 def test(): 385 runner = unittest.TextTestRunner() 386 runner.run(suite()) 387 388 if __name__ == "__main__": 389 test() 390