Home | History | Annotate | Download | only in test
      1 #-*- coding: ISO-8859-1 -*-
      2 # pysqlite2/test/types.py: tests for type conversion and detection
      3 #
      4 # Copyright (C) 2005-2007 Gerhard Hring <gh (at] ghaering.de>
      5 #
      6 # This file is part of pysqlite.
      7 #
      8 # This software is provided 'as-is', without any express or implied
      9 # warranty.  In no event will the authors be held liable for any damages
     10 # arising from the use of this software.
     11 #
     12 # Permission is granted to anyone to use this software for any purpose,
     13 # including commercial applications, and to alter it and redistribute it
     14 # freely, subject to the following restrictions:
     15 #
     16 # 1. The origin of this software must not be misrepresented; you must not
     17 #    claim that you wrote the original software. If you use this software
     18 #    in a product, an acknowledgment in the product documentation would be
     19 #    appreciated but is not required.
     20 # 2. Altered source versions must be plainly marked as such, and must not be
     21 #    misrepresented as being the original software.
     22 # 3. This notice may not be removed or altered from any source distribution.
     23 
     24 import datetime
     25 import unittest
     26 import sqlite3 as sqlite
     27 try:
     28     import zlib
     29 except ImportError:
     30     zlib = None
     31 
     32 
     33 class SqliteTypeTests(unittest.TestCase):
     34     def setUp(self):
     35         self.con = sqlite.connect(":memory:")
     36         self.cur = self.con.cursor()
     37         self.cur.execute("create table test(i integer, s varchar, f number, b blob)")
     38 
     39     def tearDown(self):
     40         self.cur.close()
     41         self.con.close()
     42 
     43     def CheckString(self):
     44         self.cur.execute("insert into test(s) values (?)", (u"sterreich",))
     45         self.cur.execute("select s from test")
     46         row = self.cur.fetchone()
     47         self.assertEqual(row[0], u"sterreich")
     48 
     49     def CheckSmallInt(self):
     50         self.cur.execute("insert into test(i) values (?)", (42,))
     51         self.cur.execute("select i from test")
     52         row = self.cur.fetchone()
     53         self.assertEqual(row[0], 42)
     54 
     55     def CheckLargeInt(self):
     56         num = 2**40
     57         self.cur.execute("insert into test(i) values (?)", (num,))
     58         self.cur.execute("select i from test")
     59         row = self.cur.fetchone()
     60         self.assertEqual(row[0], num)
     61 
     62     def CheckFloat(self):
     63         val = 3.14
     64         self.cur.execute("insert into test(f) values (?)", (val,))
     65         self.cur.execute("select f from test")
     66         row = self.cur.fetchone()
     67         self.assertEqual(row[0], val)
     68 
     69     def CheckBlob(self):
     70         val = buffer("Guglhupf")
     71         self.cur.execute("insert into test(b) values (?)", (val,))
     72         self.cur.execute("select b from test")
     73         row = self.cur.fetchone()
     74         self.assertEqual(row[0], val)
     75 
     76     def CheckUnicodeExecute(self):
     77         self.cur.execute(u"select 'sterreich'")
     78         row = self.cur.fetchone()
     79         self.assertEqual(row[0], u"sterreich")
     80 
     81     def CheckNonUtf8_Default(self):
     82         try:
     83             self.cur.execute("select ?", (chr(150),))
     84             self.fail("should have raised a ProgrammingError")
     85         except sqlite.ProgrammingError:
     86             pass
     87 
     88     def CheckNonUtf8_TextFactoryString(self):
     89         orig_text_factory = self.con.text_factory
     90         try:
     91             self.con.text_factory = str
     92             self.cur.execute("select ?", (chr(150),))
     93         finally:
     94             self.con.text_factory = orig_text_factory
     95 
     96     def CheckNonUtf8_TextFactoryOptimizedUnicode(self):
     97         orig_text_factory = self.con.text_factory
     98         try:
     99             try:
    100                 self.con.text_factory = sqlite.OptimizedUnicode
    101                 self.cur.execute("select ?", (chr(150),))
    102                 self.fail("should have raised a ProgrammingError")
    103             except sqlite.ProgrammingError:
    104                 pass
    105         finally:
    106             self.con.text_factory = orig_text_factory
    107 
    108 class DeclTypesTests(unittest.TestCase):
    109     class Foo:
    110         def __init__(self, _val):
    111             self.val = _val
    112 
    113         def __cmp__(self, other):
    114             if not isinstance(other, DeclTypesTests.Foo):
    115                 raise ValueError
    116             if self.val == other.val:
    117                 return 0
    118             else:
    119                 return 1
    120 
    121         def __conform__(self, protocol):
    122             if protocol is sqlite.PrepareProtocol:
    123                 return self.val
    124             else:
    125                 return None
    126 
    127         def __str__(self):
    128             return "<%s>" % self.val
    129 
    130     def setUp(self):
    131         self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
    132         self.cur = self.con.cursor()
    133         self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))")
    134 
    135         # override float, make them always return the same number
    136         sqlite.converters["FLOAT"] = lambda x: 47.2
    137 
    138         # and implement two custom ones
    139         sqlite.converters["BOOL"] = lambda x: bool(int(x))
    140         sqlite.converters["FOO"] = DeclTypesTests.Foo
    141         sqlite.converters["WRONG"] = lambda x: "WRONG"
    142         sqlite.converters["NUMBER"] = float
    143 
    144     def tearDown(self):
    145         del sqlite.converters["FLOAT"]
    146         del sqlite.converters["BOOL"]
    147         del sqlite.converters["FOO"]
    148         del sqlite.converters["NUMBER"]
    149         self.cur.close()
    150         self.con.close()
    151 
    152     def CheckString(self):
    153         # default
    154         self.cur.execute("insert into test(s) values (?)", ("foo",))
    155         self.cur.execute('select s as "s [WRONG]" from test')
    156         row = self.cur.fetchone()
    157         self.assertEqual(row[0], "foo")
    158 
    159     def CheckSmallInt(self):
    160         # default
    161         self.cur.execute("insert into test(i) values (?)", (42,))
    162         self.cur.execute("select i from test")
    163         row = self.cur.fetchone()
    164         self.assertEqual(row[0], 42)
    165 
    166     def CheckLargeInt(self):
    167         # default
    168         num = 2**40
    169         self.cur.execute("insert into test(i) values (?)", (num,))
    170         self.cur.execute("select i from test")
    171         row = self.cur.fetchone()
    172         self.assertEqual(row[0], num)
    173 
    174     def CheckFloat(self):
    175         # custom
    176         val = 3.14
    177         self.cur.execute("insert into test(f) values (?)", (val,))
    178         self.cur.execute("select f from test")
    179         row = self.cur.fetchone()
    180         self.assertEqual(row[0], 47.2)
    181 
    182     def CheckBool(self):
    183         # custom
    184         self.cur.execute("insert into test(b) values (?)", (False,))
    185         self.cur.execute("select b from test")
    186         row = self.cur.fetchone()
    187         self.assertEqual(row[0], False)
    188 
    189         self.cur.execute("delete from test")
    190         self.cur.execute("insert into test(b) values (?)", (True,))
    191         self.cur.execute("select b from test")
    192         row = self.cur.fetchone()
    193         self.assertEqual(row[0], True)
    194 
    195     def CheckUnicode(self):
    196         # default
    197         val = u"\xd6sterreich"
    198         self.cur.execute("insert into test(u) values (?)", (val,))
    199         self.cur.execute("select u from test")
    200         row = self.cur.fetchone()
    201         self.assertEqual(row[0], val)
    202 
    203     def CheckFoo(self):
    204         val = DeclTypesTests.Foo("bla")
    205         self.cur.execute("insert into test(foo) values (?)", (val,))
    206         self.cur.execute("select foo from test")
    207         row = self.cur.fetchone()
    208         self.assertEqual(row[0], val)
    209 
    210     def CheckUnsupportedSeq(self):
    211         class Bar: pass
    212         val = Bar()
    213         try:
    214             self.cur.execute("insert into test(f) values (?)", (val,))
    215             self.fail("should have raised an InterfaceError")
    216         except sqlite.InterfaceError:
    217             pass
    218         except:
    219             self.fail("should have raised an InterfaceError")
    220 
    221     def CheckUnsupportedDict(self):
    222         class Bar: pass
    223         val = Bar()
    224         try:
    225             self.cur.execute("insert into test(f) values (:val)", {"val": val})
    226             self.fail("should have raised an InterfaceError")
    227         except sqlite.InterfaceError:
    228             pass
    229         except:
    230             self.fail("should have raised an InterfaceError")
    231 
    232     def CheckBlob(self):
    233         # default
    234         val = buffer("Guglhupf")
    235         self.cur.execute("insert into test(bin) values (?)", (val,))
    236         self.cur.execute("select bin from test")
    237         row = self.cur.fetchone()
    238         self.assertEqual(row[0], val)
    239 
    240     def CheckNumber1(self):
    241         self.cur.execute("insert into test(n1) values (5)")
    242         value = self.cur.execute("select n1 from test").fetchone()[0]
    243         # if the converter is not used, it's an int instead of a float
    244         self.assertEqual(type(value), float)
    245 
    246     def CheckNumber2(self):
    247         """Checks wether converter names are cut off at '(' characters"""
    248         self.cur.execute("insert into test(n2) values (5)")
    249         value = self.cur.execute("select n2 from test").fetchone()[0]
    250         # if the converter is not used, it's an int instead of a float
    251         self.assertEqual(type(value), float)
    252 
    253 class ColNamesTests(unittest.TestCase):
    254     def setUp(self):
    255         self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
    256         self.cur = self.con.cursor()
    257         self.cur.execute("create table test(x foo)")
    258 
    259         sqlite.converters["FOO"] = lambda x: "[%s]" % x
    260         sqlite.converters["BAR"] = lambda x: "<%s>" % x
    261         sqlite.converters["EXC"] = lambda x: 5 // 0
    262         sqlite.converters["B1B1"] = lambda x: "MARKER"
    263 
    264     def tearDown(self):
    265         del sqlite.converters["FOO"]
    266         del sqlite.converters["BAR"]
    267         del sqlite.converters["EXC"]
    268         del sqlite.converters["B1B1"]
    269         self.cur.close()
    270         self.con.close()
    271 
    272     def CheckDeclTypeNotUsed(self):
    273         """
    274         Assures that the declared type is not used when PARSE_DECLTYPES
    275         is not set.
    276         """
    277         self.cur.execute("insert into test(x) values (?)", ("xxx",))
    278         self.cur.execute("select x from test")
    279         val = self.cur.fetchone()[0]
    280         self.assertEqual(val, "xxx")
    281 
    282     def CheckNone(self):
    283         self.cur.execute("insert into test(x) values (?)", (None,))
    284         self.cur.execute("select x from test")
    285         val = self.cur.fetchone()[0]
    286         self.assertEqual(val, None)
    287 
    288     def CheckColName(self):
    289         self.cur.execute("insert into test(x) values (?)", ("xxx",))
    290         self.cur.execute('select x as "x [bar]" from test')
    291         val = self.cur.fetchone()[0]
    292         self.assertEqual(val, "<xxx>")
    293 
    294         # Check if the stripping of colnames works. Everything after the first
    295         # whitespace should be stripped.
    296         self.assertEqual(self.cur.description[0][0], "x")
    297 
    298     def CheckCaseInConverterName(self):
    299         self.cur.execute("""select 'other' as "x [b1b1]\"""")
    300         val = self.cur.fetchone()[0]
    301         self.assertEqual(val, "MARKER")
    302 
    303     def CheckCursorDescriptionNoRow(self):
    304         """
    305         cursor.description should at least provide the column name(s), even if
    306         no row returned.
    307         """
    308         self.cur.execute("select * from test where 0 = 1")
    309         self.assertEqual(self.cur.description[0][0], "x")
    310 
    311 class ObjectAdaptationTests(unittest.TestCase):
    312     def cast(obj):
    313         return float(obj)
    314     cast = staticmethod(cast)
    315 
    316     def setUp(self):
    317         self.con = sqlite.connect(":memory:")
    318         try:
    319             del sqlite.adapters[int]
    320         except:
    321             pass
    322         sqlite.register_adapter(int, ObjectAdaptationTests.cast)
    323         self.cur = self.con.cursor()
    324 
    325     def tearDown(self):
    326         del sqlite.adapters[(int, sqlite.PrepareProtocol)]
    327         self.cur.close()
    328         self.con.close()
    329 
    330     def CheckCasterIsUsed(self):
    331         self.cur.execute("select ?", (4,))
    332         val = self.cur.fetchone()[0]
    333         self.assertEqual(type(val), float)
    334 
    335 @unittest.skipUnless(zlib, "requires zlib")
    336 class BinaryConverterTests(unittest.TestCase):
    337     def convert(s):
    338         return zlib.decompress(s)
    339     convert = staticmethod(convert)
    340 
    341     def setUp(self):
    342         self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
    343         sqlite.register_converter("bin", BinaryConverterTests.convert)
    344 
    345     def tearDown(self):
    346         self.con.close()
    347 
    348     def CheckBinaryInputForConverter(self):
    349         testdata = "abcdefg" * 10
    350         result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0]
    351         self.assertEqual(testdata, result)
    352 
    353 class DateTimeTests(unittest.TestCase):
    354     def setUp(self):
    355         self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
    356         self.cur = self.con.cursor()
    357         self.cur.execute("create table test(d date, ts timestamp)")
    358 
    359     def tearDown(self):
    360         self.cur.close()
    361         self.con.close()
    362 
    363     def CheckSqliteDate(self):
    364         d = sqlite.Date(2004, 2, 14)
    365         self.cur.execute("insert into test(d) values (?)", (d,))
    366         self.cur.execute("select d from test")
    367         d2 = self.cur.fetchone()[0]
    368         self.assertEqual(d, d2)
    369 
    370     def CheckSqliteTimestamp(self):
    371         ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0)
    372         self.cur.execute("insert into test(ts) values (?)", (ts,))
    373         self.cur.execute("select ts from test")
    374         ts2 = self.cur.fetchone()[0]
    375         self.assertEqual(ts, ts2)
    376 
    377     def CheckSqlTimestamp(self):
    378         # The date functions are only available in SQLite version 3.1 or later
    379         if sqlite.sqlite_version_info < (3, 1):
    380             return
    381 
    382         # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time.
    383         now = datetime.datetime.now()
    384         self.cur.execute("insert into test(ts) values (current_timestamp)")
    385         self.cur.execute("select ts from test")
    386         ts = self.cur.fetchone()[0]
    387         self.assertEqual(type(ts), datetime.datetime)
    388         self.assertEqual(ts.year, now.year)
    389 
    390     def CheckDateTimeSubSeconds(self):
    391         ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
    392         self.cur.execute("insert into test(ts) values (?)", (ts,))
    393         self.cur.execute("select ts from test")
    394         ts2 = self.cur.fetchone()[0]
    395         self.assertEqual(ts, ts2)
    396 
    397     def CheckDateTimeSubSecondsFloatingPoint(self):
    398         ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241)
    399         self.cur.execute("insert into test(ts) values (?)", (ts,))
    400         self.cur.execute("select ts from test")
    401         ts2 = self.cur.fetchone()[0]
    402         self.assertEqual(ts, ts2)
    403 
    404 def suite():
    405     sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check")
    406     decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check")
    407     colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check")
    408     adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check")
    409     bin_suite = unittest.makeSuite(BinaryConverterTests, "Check")
    410     date_suite = unittest.makeSuite(DateTimeTests, "Check")
    411     return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite))
    412 
    413 def test():
    414     runner = unittest.TextTestRunner()
    415     runner.run(suite())
    416 
    417 if __name__ == "__main__":
    418     test()
    419