1 #-*- coding: ISO-8859-1 -*- 2 # pysqlite2/test/types.py: tests for type conversion and detection 3 # 4 # Copyright (C) 2005-2007 Gerhard Hring <gh (at] ghaering.de> 5 # 6 # This file is part of pysqlite. 7 # 8 # This software is provided 'as-is', without any express or implied 9 # warranty. In no event will the authors be held liable for any damages 10 # arising from the use of this software. 11 # 12 # Permission is granted to anyone to use this software for any purpose, 13 # including commercial applications, and to alter it and redistribute it 14 # freely, subject to the following restrictions: 15 # 16 # 1. The origin of this software must not be misrepresented; you must not 17 # claim that you wrote the original software. If you use this software 18 # in a product, an acknowledgment in the product documentation would be 19 # appreciated but is not required. 20 # 2. Altered source versions must be plainly marked as such, and must not be 21 # misrepresented as being the original software. 22 # 3. This notice may not be removed or altered from any source distribution. 23 24 import datetime 25 import unittest 26 import sqlite3 as sqlite 27 try: 28 import zlib 29 except ImportError: 30 zlib = None 31 32 33 class SqliteTypeTests(unittest.TestCase): 34 def setUp(self): 35 self.con = sqlite.connect(":memory:") 36 self.cur = self.con.cursor() 37 self.cur.execute("create table test(i integer, s varchar, f number, b blob)") 38 39 def tearDown(self): 40 self.cur.close() 41 self.con.close() 42 43 def CheckString(self): 44 self.cur.execute("insert into test(s) values (?)", (u"sterreich",)) 45 self.cur.execute("select s from test") 46 row = self.cur.fetchone() 47 self.assertEqual(row[0], u"sterreich") 48 49 def CheckSmallInt(self): 50 self.cur.execute("insert into test(i) values (?)", (42,)) 51 self.cur.execute("select i from test") 52 row = self.cur.fetchone() 53 self.assertEqual(row[0], 42) 54 55 def CheckLargeInt(self): 56 num = 2**40 57 self.cur.execute("insert into test(i) values (?)", (num,)) 58 self.cur.execute("select i from test") 59 row = self.cur.fetchone() 60 self.assertEqual(row[0], num) 61 62 def CheckFloat(self): 63 val = 3.14 64 self.cur.execute("insert into test(f) values (?)", (val,)) 65 self.cur.execute("select f from test") 66 row = self.cur.fetchone() 67 self.assertEqual(row[0], val) 68 69 def CheckBlob(self): 70 val = buffer("Guglhupf") 71 self.cur.execute("insert into test(b) values (?)", (val,)) 72 self.cur.execute("select b from test") 73 row = self.cur.fetchone() 74 self.assertEqual(row[0], val) 75 76 def CheckUnicodeExecute(self): 77 self.cur.execute(u"select 'sterreich'") 78 row = self.cur.fetchone() 79 self.assertEqual(row[0], u"sterreich") 80 81 def CheckNonUtf8_Default(self): 82 try: 83 self.cur.execute("select ?", (chr(150),)) 84 self.fail("should have raised a ProgrammingError") 85 except sqlite.ProgrammingError: 86 pass 87 88 def CheckNonUtf8_TextFactoryString(self): 89 orig_text_factory = self.con.text_factory 90 try: 91 self.con.text_factory = str 92 self.cur.execute("select ?", (chr(150),)) 93 finally: 94 self.con.text_factory = orig_text_factory 95 96 def CheckNonUtf8_TextFactoryOptimizedUnicode(self): 97 orig_text_factory = self.con.text_factory 98 try: 99 try: 100 self.con.text_factory = sqlite.OptimizedUnicode 101 self.cur.execute("select ?", (chr(150),)) 102 self.fail("should have raised a ProgrammingError") 103 except sqlite.ProgrammingError: 104 pass 105 finally: 106 self.con.text_factory = orig_text_factory 107 108 class DeclTypesTests(unittest.TestCase): 109 class Foo: 110 def __init__(self, _val): 111 self.val = _val 112 113 def __cmp__(self, other): 114 if not isinstance(other, DeclTypesTests.Foo): 115 raise ValueError 116 if self.val == other.val: 117 return 0 118 else: 119 return 1 120 121 def __conform__(self, protocol): 122 if protocol is sqlite.PrepareProtocol: 123 return self.val 124 else: 125 return None 126 127 def __str__(self): 128 return "<%s>" % self.val 129 130 def setUp(self): 131 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 132 self.cur = self.con.cursor() 133 self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))") 134 135 # override float, make them always return the same number 136 sqlite.converters["FLOAT"] = lambda x: 47.2 137 138 # and implement two custom ones 139 sqlite.converters["BOOL"] = lambda x: bool(int(x)) 140 sqlite.converters["FOO"] = DeclTypesTests.Foo 141 sqlite.converters["WRONG"] = lambda x: "WRONG" 142 sqlite.converters["NUMBER"] = float 143 144 def tearDown(self): 145 del sqlite.converters["FLOAT"] 146 del sqlite.converters["BOOL"] 147 del sqlite.converters["FOO"] 148 del sqlite.converters["NUMBER"] 149 self.cur.close() 150 self.con.close() 151 152 def CheckString(self): 153 # default 154 self.cur.execute("insert into test(s) values (?)", ("foo",)) 155 self.cur.execute('select s as "s [WRONG]" from test') 156 row = self.cur.fetchone() 157 self.assertEqual(row[0], "foo") 158 159 def CheckSmallInt(self): 160 # default 161 self.cur.execute("insert into test(i) values (?)", (42,)) 162 self.cur.execute("select i from test") 163 row = self.cur.fetchone() 164 self.assertEqual(row[0], 42) 165 166 def CheckLargeInt(self): 167 # default 168 num = 2**40 169 self.cur.execute("insert into test(i) values (?)", (num,)) 170 self.cur.execute("select i from test") 171 row = self.cur.fetchone() 172 self.assertEqual(row[0], num) 173 174 def CheckFloat(self): 175 # custom 176 val = 3.14 177 self.cur.execute("insert into test(f) values (?)", (val,)) 178 self.cur.execute("select f from test") 179 row = self.cur.fetchone() 180 self.assertEqual(row[0], 47.2) 181 182 def CheckBool(self): 183 # custom 184 self.cur.execute("insert into test(b) values (?)", (False,)) 185 self.cur.execute("select b from test") 186 row = self.cur.fetchone() 187 self.assertEqual(row[0], False) 188 189 self.cur.execute("delete from test") 190 self.cur.execute("insert into test(b) values (?)", (True,)) 191 self.cur.execute("select b from test") 192 row = self.cur.fetchone() 193 self.assertEqual(row[0], True) 194 195 def CheckUnicode(self): 196 # default 197 val = u"\xd6sterreich" 198 self.cur.execute("insert into test(u) values (?)", (val,)) 199 self.cur.execute("select u from test") 200 row = self.cur.fetchone() 201 self.assertEqual(row[0], val) 202 203 def CheckFoo(self): 204 val = DeclTypesTests.Foo("bla") 205 self.cur.execute("insert into test(foo) values (?)", (val,)) 206 self.cur.execute("select foo from test") 207 row = self.cur.fetchone() 208 self.assertEqual(row[0], val) 209 210 def CheckUnsupportedSeq(self): 211 class Bar: pass 212 val = Bar() 213 try: 214 self.cur.execute("insert into test(f) values (?)", (val,)) 215 self.fail("should have raised an InterfaceError") 216 except sqlite.InterfaceError: 217 pass 218 except: 219 self.fail("should have raised an InterfaceError") 220 221 def CheckUnsupportedDict(self): 222 class Bar: pass 223 val = Bar() 224 try: 225 self.cur.execute("insert into test(f) values (:val)", {"val": val}) 226 self.fail("should have raised an InterfaceError") 227 except sqlite.InterfaceError: 228 pass 229 except: 230 self.fail("should have raised an InterfaceError") 231 232 def CheckBlob(self): 233 # default 234 val = buffer("Guglhupf") 235 self.cur.execute("insert into test(bin) values (?)", (val,)) 236 self.cur.execute("select bin from test") 237 row = self.cur.fetchone() 238 self.assertEqual(row[0], val) 239 240 def CheckNumber1(self): 241 self.cur.execute("insert into test(n1) values (5)") 242 value = self.cur.execute("select n1 from test").fetchone()[0] 243 # if the converter is not used, it's an int instead of a float 244 self.assertEqual(type(value), float) 245 246 def CheckNumber2(self): 247 """Checks wether converter names are cut off at '(' characters""" 248 self.cur.execute("insert into test(n2) values (5)") 249 value = self.cur.execute("select n2 from test").fetchone()[0] 250 # if the converter is not used, it's an int instead of a float 251 self.assertEqual(type(value), float) 252 253 class ColNamesTests(unittest.TestCase): 254 def setUp(self): 255 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 256 self.cur = self.con.cursor() 257 self.cur.execute("create table test(x foo)") 258 259 sqlite.converters["FOO"] = lambda x: "[%s]" % x 260 sqlite.converters["BAR"] = lambda x: "<%s>" % x 261 sqlite.converters["EXC"] = lambda x: 5 // 0 262 sqlite.converters["B1B1"] = lambda x: "MARKER" 263 264 def tearDown(self): 265 del sqlite.converters["FOO"] 266 del sqlite.converters["BAR"] 267 del sqlite.converters["EXC"] 268 del sqlite.converters["B1B1"] 269 self.cur.close() 270 self.con.close() 271 272 def CheckDeclTypeNotUsed(self): 273 """ 274 Assures that the declared type is not used when PARSE_DECLTYPES 275 is not set. 276 """ 277 self.cur.execute("insert into test(x) values (?)", ("xxx",)) 278 self.cur.execute("select x from test") 279 val = self.cur.fetchone()[0] 280 self.assertEqual(val, "xxx") 281 282 def CheckNone(self): 283 self.cur.execute("insert into test(x) values (?)", (None,)) 284 self.cur.execute("select x from test") 285 val = self.cur.fetchone()[0] 286 self.assertEqual(val, None) 287 288 def CheckColName(self): 289 self.cur.execute("insert into test(x) values (?)", ("xxx",)) 290 self.cur.execute('select x as "x [bar]" from test') 291 val = self.cur.fetchone()[0] 292 self.assertEqual(val, "<xxx>") 293 294 # Check if the stripping of colnames works. Everything after the first 295 # whitespace should be stripped. 296 self.assertEqual(self.cur.description[0][0], "x") 297 298 def CheckCaseInConverterName(self): 299 self.cur.execute("""select 'other' as "x [b1b1]\"""") 300 val = self.cur.fetchone()[0] 301 self.assertEqual(val, "MARKER") 302 303 def CheckCursorDescriptionNoRow(self): 304 """ 305 cursor.description should at least provide the column name(s), even if 306 no row returned. 307 """ 308 self.cur.execute("select * from test where 0 = 1") 309 self.assertEqual(self.cur.description[0][0], "x") 310 311 class ObjectAdaptationTests(unittest.TestCase): 312 def cast(obj): 313 return float(obj) 314 cast = staticmethod(cast) 315 316 def setUp(self): 317 self.con = sqlite.connect(":memory:") 318 try: 319 del sqlite.adapters[int] 320 except: 321 pass 322 sqlite.register_adapter(int, ObjectAdaptationTests.cast) 323 self.cur = self.con.cursor() 324 325 def tearDown(self): 326 del sqlite.adapters[(int, sqlite.PrepareProtocol)] 327 self.cur.close() 328 self.con.close() 329 330 def CheckCasterIsUsed(self): 331 self.cur.execute("select ?", (4,)) 332 val = self.cur.fetchone()[0] 333 self.assertEqual(type(val), float) 334 335 @unittest.skipUnless(zlib, "requires zlib") 336 class BinaryConverterTests(unittest.TestCase): 337 def convert(s): 338 return zlib.decompress(s) 339 convert = staticmethod(convert) 340 341 def setUp(self): 342 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 343 sqlite.register_converter("bin", BinaryConverterTests.convert) 344 345 def tearDown(self): 346 self.con.close() 347 348 def CheckBinaryInputForConverter(self): 349 testdata = "abcdefg" * 10 350 result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0] 351 self.assertEqual(testdata, result) 352 353 class DateTimeTests(unittest.TestCase): 354 def setUp(self): 355 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 356 self.cur = self.con.cursor() 357 self.cur.execute("create table test(d date, ts timestamp)") 358 359 def tearDown(self): 360 self.cur.close() 361 self.con.close() 362 363 def CheckSqliteDate(self): 364 d = sqlite.Date(2004, 2, 14) 365 self.cur.execute("insert into test(d) values (?)", (d,)) 366 self.cur.execute("select d from test") 367 d2 = self.cur.fetchone()[0] 368 self.assertEqual(d, d2) 369 370 def CheckSqliteTimestamp(self): 371 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0) 372 self.cur.execute("insert into test(ts) values (?)", (ts,)) 373 self.cur.execute("select ts from test") 374 ts2 = self.cur.fetchone()[0] 375 self.assertEqual(ts, ts2) 376 377 def CheckSqlTimestamp(self): 378 # The date functions are only available in SQLite version 3.1 or later 379 if sqlite.sqlite_version_info < (3, 1): 380 return 381 382 # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time. 383 now = datetime.datetime.now() 384 self.cur.execute("insert into test(ts) values (current_timestamp)") 385 self.cur.execute("select ts from test") 386 ts = self.cur.fetchone()[0] 387 self.assertEqual(type(ts), datetime.datetime) 388 self.assertEqual(ts.year, now.year) 389 390 def CheckDateTimeSubSeconds(self): 391 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000) 392 self.cur.execute("insert into test(ts) values (?)", (ts,)) 393 self.cur.execute("select ts from test") 394 ts2 = self.cur.fetchone()[0] 395 self.assertEqual(ts, ts2) 396 397 def CheckDateTimeSubSecondsFloatingPoint(self): 398 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241) 399 self.cur.execute("insert into test(ts) values (?)", (ts,)) 400 self.cur.execute("select ts from test") 401 ts2 = self.cur.fetchone()[0] 402 self.assertEqual(ts, ts2) 403 404 def suite(): 405 sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check") 406 decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check") 407 colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check") 408 adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check") 409 bin_suite = unittest.makeSuite(BinaryConverterTests, "Check") 410 date_suite = unittest.makeSuite(DateTimeTests, "Check") 411 return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite)) 412 413 def test(): 414 runner = unittest.TextTestRunner() 415 runner.run(suite()) 416 417 if __name__ == "__main__": 418 test() 419