1 #-*- coding: ISO-8859-1 -*- 2 # pysqlite2/test/types.py: tests for type conversion and detection 3 # 4 # Copyright (C) 2005-2007 Gerhard Hring <gh (at] ghaering.de> 5 # 6 # This file is part of pysqlite. 7 # 8 # This software is provided 'as-is', without any express or implied 9 # warranty. In no event will the authors be held liable for any damages 10 # arising from the use of this software. 11 # 12 # Permission is granted to anyone to use this software for any purpose, 13 # including commercial applications, and to alter it and redistribute it 14 # freely, subject to the following restrictions: 15 # 16 # 1. The origin of this software must not be misrepresented; you must not 17 # claim that you wrote the original software. If you use this software 18 # in a product, an acknowledgment in the product documentation would be 19 # appreciated but is not required. 20 # 2. Altered source versions must be plainly marked as such, and must not be 21 # misrepresented as being the original software. 22 # 3. This notice may not be removed or altered from any source distribution. 23 24 import datetime 25 import unittest 26 import sqlite3 as sqlite 27 from test import test_support 28 try: 29 import zlib 30 except ImportError: 31 zlib = None 32 33 34 class SqliteTypeTests(unittest.TestCase): 35 def setUp(self): 36 self.con = sqlite.connect(":memory:") 37 self.cur = self.con.cursor() 38 self.cur.execute("create table test(i integer, s varchar, f number, b blob)") 39 40 def tearDown(self): 41 self.cur.close() 42 self.con.close() 43 44 def CheckString(self): 45 self.cur.execute("insert into test(s) values (?)", (u"sterreich",)) 46 self.cur.execute("select s from test") 47 row = self.cur.fetchone() 48 self.assertEqual(row[0], u"sterreich") 49 50 def CheckSmallInt(self): 51 self.cur.execute("insert into test(i) values (?)", (42,)) 52 self.cur.execute("select i from test") 53 row = self.cur.fetchone() 54 self.assertEqual(row[0], 42) 55 56 def CheckLargeInt(self): 57 num = 2**40 58 self.cur.execute("insert into test(i) values (?)", (num,)) 59 self.cur.execute("select i from test") 60 row = self.cur.fetchone() 61 self.assertEqual(row[0], num) 62 63 def CheckFloat(self): 64 val = 3.14 65 self.cur.execute("insert into test(f) values (?)", (val,)) 66 self.cur.execute("select f from test") 67 row = self.cur.fetchone() 68 self.assertEqual(row[0], val) 69 70 def CheckBlob(self): 71 with test_support.check_py3k_warnings(): 72 val = buffer("Guglhupf") 73 self.cur.execute("insert into test(b) values (?)", (val,)) 74 self.cur.execute("select b from test") 75 row = self.cur.fetchone() 76 self.assertEqual(row[0], val) 77 78 def CheckUnicodeExecute(self): 79 self.cur.execute(u"select 'sterreich'") 80 row = self.cur.fetchone() 81 self.assertEqual(row[0], u"sterreich") 82 83 def CheckNonUtf8_Default(self): 84 try: 85 self.cur.execute("select ?", (chr(150),)) 86 self.fail("should have raised a ProgrammingError") 87 except sqlite.ProgrammingError: 88 pass 89 90 def CheckNonUtf8_TextFactoryString(self): 91 orig_text_factory = self.con.text_factory 92 try: 93 self.con.text_factory = str 94 self.cur.execute("select ?", (chr(150),)) 95 finally: 96 self.con.text_factory = orig_text_factory 97 98 def CheckNonUtf8_TextFactoryOptimizedUnicode(self): 99 orig_text_factory = self.con.text_factory 100 try: 101 try: 102 self.con.text_factory = sqlite.OptimizedUnicode 103 self.cur.execute("select ?", (chr(150),)) 104 self.fail("should have raised a ProgrammingError") 105 except sqlite.ProgrammingError: 106 pass 107 finally: 108 self.con.text_factory = orig_text_factory 109 110 class DeclTypesTests(unittest.TestCase): 111 class Foo: 112 def __init__(self, _val): 113 self.val = _val 114 115 def __cmp__(self, other): 116 if not isinstance(other, DeclTypesTests.Foo): 117 raise ValueError 118 if self.val == other.val: 119 return 0 120 else: 121 return 1 122 123 def __conform__(self, protocol): 124 if protocol is sqlite.PrepareProtocol: 125 return self.val 126 else: 127 return None 128 129 def __str__(self): 130 return "<%s>" % self.val 131 132 def setUp(self): 133 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 134 self.cur = self.con.cursor() 135 self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))") 136 137 # override float, make them always return the same number 138 sqlite.converters["FLOAT"] = lambda x: 47.2 139 140 # and implement two custom ones 141 sqlite.converters["BOOL"] = lambda x: bool(int(x)) 142 sqlite.converters["FOO"] = DeclTypesTests.Foo 143 sqlite.converters["WRONG"] = lambda x: "WRONG" 144 sqlite.converters["NUMBER"] = float 145 146 def tearDown(self): 147 del sqlite.converters["FLOAT"] 148 del sqlite.converters["BOOL"] 149 del sqlite.converters["FOO"] 150 del sqlite.converters["NUMBER"] 151 self.cur.close() 152 self.con.close() 153 154 def CheckString(self): 155 # default 156 self.cur.execute("insert into test(s) values (?)", ("foo",)) 157 self.cur.execute('select s as "s [WRONG]" from test') 158 row = self.cur.fetchone() 159 self.assertEqual(row[0], "foo") 160 161 def CheckSmallInt(self): 162 # default 163 self.cur.execute("insert into test(i) values (?)", (42,)) 164 self.cur.execute("select i from test") 165 row = self.cur.fetchone() 166 self.assertEqual(row[0], 42) 167 168 def CheckLargeInt(self): 169 # default 170 num = 2**40 171 self.cur.execute("insert into test(i) values (?)", (num,)) 172 self.cur.execute("select i from test") 173 row = self.cur.fetchone() 174 self.assertEqual(row[0], num) 175 176 def CheckFloat(self): 177 # custom 178 val = 3.14 179 self.cur.execute("insert into test(f) values (?)", (val,)) 180 self.cur.execute("select f from test") 181 row = self.cur.fetchone() 182 self.assertEqual(row[0], 47.2) 183 184 def CheckBool(self): 185 # custom 186 self.cur.execute("insert into test(b) values (?)", (False,)) 187 self.cur.execute("select b from test") 188 row = self.cur.fetchone() 189 self.assertEqual(row[0], False) 190 191 self.cur.execute("delete from test") 192 self.cur.execute("insert into test(b) values (?)", (True,)) 193 self.cur.execute("select b from test") 194 row = self.cur.fetchone() 195 self.assertEqual(row[0], True) 196 197 def CheckUnicode(self): 198 # default 199 val = u"\xd6sterreich" 200 self.cur.execute("insert into test(u) values (?)", (val,)) 201 self.cur.execute("select u from test") 202 row = self.cur.fetchone() 203 self.assertEqual(row[0], val) 204 205 def CheckFoo(self): 206 val = DeclTypesTests.Foo("bla") 207 self.cur.execute("insert into test(foo) values (?)", (val,)) 208 self.cur.execute("select foo from test") 209 row = self.cur.fetchone() 210 self.assertEqual(row[0], val) 211 212 def CheckUnsupportedSeq(self): 213 class Bar: pass 214 val = Bar() 215 try: 216 self.cur.execute("insert into test(f) values (?)", (val,)) 217 self.fail("should have raised an InterfaceError") 218 except sqlite.InterfaceError: 219 pass 220 except: 221 self.fail("should have raised an InterfaceError") 222 223 def CheckUnsupportedDict(self): 224 class Bar: pass 225 val = Bar() 226 try: 227 self.cur.execute("insert into test(f) values (:val)", {"val": val}) 228 self.fail("should have raised an InterfaceError") 229 except sqlite.InterfaceError: 230 pass 231 except: 232 self.fail("should have raised an InterfaceError") 233 234 def CheckBlob(self): 235 # default 236 with test_support.check_py3k_warnings(): 237 val = buffer("Guglhupf") 238 self.cur.execute("insert into test(bin) values (?)", (val,)) 239 self.cur.execute("select bin from test") 240 row = self.cur.fetchone() 241 self.assertEqual(row[0], val) 242 243 def CheckNumber1(self): 244 self.cur.execute("insert into test(n1) values (5)") 245 value = self.cur.execute("select n1 from test").fetchone()[0] 246 # if the converter is not used, it's an int instead of a float 247 self.assertEqual(type(value), float) 248 249 def CheckNumber2(self): 250 """Checks whether converter names are cut off at '(' characters""" 251 self.cur.execute("insert into test(n2) values (5)") 252 value = self.cur.execute("select n2 from test").fetchone()[0] 253 # if the converter is not used, it's an int instead of a float 254 self.assertEqual(type(value), float) 255 256 class ColNamesTests(unittest.TestCase): 257 def setUp(self): 258 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 259 self.cur = self.con.cursor() 260 self.cur.execute("create table test(x foo)") 261 262 sqlite.converters["FOO"] = lambda x: "[%s]" % x 263 sqlite.converters["BAR"] = lambda x: "<%s>" % x 264 sqlite.converters["EXC"] = lambda x: 5 // 0 265 sqlite.converters["B1B1"] = lambda x: "MARKER" 266 267 def tearDown(self): 268 del sqlite.converters["FOO"] 269 del sqlite.converters["BAR"] 270 del sqlite.converters["EXC"] 271 del sqlite.converters["B1B1"] 272 self.cur.close() 273 self.con.close() 274 275 def CheckDeclTypeNotUsed(self): 276 """ 277 Assures that the declared type is not used when PARSE_DECLTYPES 278 is not set. 279 """ 280 self.cur.execute("insert into test(x) values (?)", ("xxx",)) 281 self.cur.execute("select x from test") 282 val = self.cur.fetchone()[0] 283 self.assertEqual(val, "xxx") 284 285 def CheckNone(self): 286 self.cur.execute("insert into test(x) values (?)", (None,)) 287 self.cur.execute("select x from test") 288 val = self.cur.fetchone()[0] 289 self.assertEqual(val, None) 290 291 def CheckColName(self): 292 self.cur.execute("insert into test(x) values (?)", ("xxx",)) 293 self.cur.execute('select x as "x [bar]" from test') 294 val = self.cur.fetchone()[0] 295 self.assertEqual(val, "<xxx>") 296 297 # Check if the stripping of colnames works. Everything after the first 298 # whitespace should be stripped. 299 self.assertEqual(self.cur.description[0][0], "x") 300 301 def CheckCaseInConverterName(self): 302 self.cur.execute("""select 'other' as "x [b1b1]\"""") 303 val = self.cur.fetchone()[0] 304 self.assertEqual(val, "MARKER") 305 306 def CheckCursorDescriptionNoRow(self): 307 """ 308 cursor.description should at least provide the column name(s), even if 309 no row returned. 310 """ 311 self.cur.execute("select * from test where 0 = 1") 312 self.assertEqual(self.cur.description[0][0], "x") 313 314 class ObjectAdaptationTests(unittest.TestCase): 315 def cast(obj): 316 return float(obj) 317 cast = staticmethod(cast) 318 319 def setUp(self): 320 self.con = sqlite.connect(":memory:") 321 try: 322 del sqlite.adapters[int] 323 except: 324 pass 325 sqlite.register_adapter(int, ObjectAdaptationTests.cast) 326 self.cur = self.con.cursor() 327 328 def tearDown(self): 329 del sqlite.adapters[(int, sqlite.PrepareProtocol)] 330 self.cur.close() 331 self.con.close() 332 333 def CheckCasterIsUsed(self): 334 self.cur.execute("select ?", (4,)) 335 val = self.cur.fetchone()[0] 336 self.assertEqual(type(val), float) 337 338 @unittest.skipUnless(zlib, "requires zlib") 339 class BinaryConverterTests(unittest.TestCase): 340 def convert(s): 341 return zlib.decompress(s) 342 convert = staticmethod(convert) 343 344 def setUp(self): 345 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 346 sqlite.register_converter("bin", BinaryConverterTests.convert) 347 348 def tearDown(self): 349 self.con.close() 350 351 def CheckBinaryInputForConverter(self): 352 testdata = "abcdefg" * 10 353 with test_support.check_py3k_warnings(): 354 result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0] 355 self.assertEqual(testdata, result) 356 357 class DateTimeTests(unittest.TestCase): 358 def setUp(self): 359 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 360 self.cur = self.con.cursor() 361 self.cur.execute("create table test(d date, ts timestamp)") 362 363 def tearDown(self): 364 self.cur.close() 365 self.con.close() 366 367 def CheckSqliteDate(self): 368 d = sqlite.Date(2004, 2, 14) 369 self.cur.execute("insert into test(d) values (?)", (d,)) 370 self.cur.execute("select d from test") 371 d2 = self.cur.fetchone()[0] 372 self.assertEqual(d, d2) 373 374 def CheckSqliteTimestamp(self): 375 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0) 376 self.cur.execute("insert into test(ts) values (?)", (ts,)) 377 self.cur.execute("select ts from test") 378 ts2 = self.cur.fetchone()[0] 379 self.assertEqual(ts, ts2) 380 381 def CheckSqlTimestamp(self): 382 # The date functions are only available in SQLite version 3.1 or later 383 if sqlite.sqlite_version_info < (3, 1): 384 return 385 386 # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time. 387 now = datetime.datetime.now() 388 self.cur.execute("insert into test(ts) values (current_timestamp)") 389 self.cur.execute("select ts from test") 390 ts = self.cur.fetchone()[0] 391 self.assertEqual(type(ts), datetime.datetime) 392 self.assertEqual(ts.year, now.year) 393 394 def CheckDateTimeSubSeconds(self): 395 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000) 396 self.cur.execute("insert into test(ts) values (?)", (ts,)) 397 self.cur.execute("select ts from test") 398 ts2 = self.cur.fetchone()[0] 399 self.assertEqual(ts, ts2) 400 401 def CheckDateTimeSubSecondsFloatingPoint(self): 402 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241) 403 self.cur.execute("insert into test(ts) values (?)", (ts,)) 404 self.cur.execute("select ts from test") 405 ts2 = self.cur.fetchone()[0] 406 self.assertEqual(ts, ts2) 407 408 def suite(): 409 sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check") 410 decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check") 411 colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check") 412 adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check") 413 bin_suite = unittest.makeSuite(BinaryConverterTests, "Check") 414 date_suite = unittest.makeSuite(DateTimeTests, "Check") 415 return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite)) 416 417 def test(): 418 runner = unittest.TextTestRunner() 419 runner.run(suite()) 420 421 if __name__ == "__main__": 422 test() 423