Home | History | Annotate | Download | only in test
      1 #-*- coding: ISO-8859-1 -*-
      2 # pysqlite2/test/types.py: tests for type conversion and detection
      3 #
      4 # Copyright (C) 2005-2007 Gerhard Hring <gh (at] ghaering.de>
      5 #
      6 # This file is part of pysqlite.
      7 #
      8 # This software is provided 'as-is', without any express or implied
      9 # warranty.  In no event will the authors be held liable for any damages
     10 # arising from the use of this software.
     11 #
     12 # Permission is granted to anyone to use this software for any purpose,
     13 # including commercial applications, and to alter it and redistribute it
     14 # freely, subject to the following restrictions:
     15 #
     16 # 1. The origin of this software must not be misrepresented; you must not
     17 #    claim that you wrote the original software. If you use this software
     18 #    in a product, an acknowledgment in the product documentation would be
     19 #    appreciated but is not required.
     20 # 2. Altered source versions must be plainly marked as such, and must not be
     21 #    misrepresented as being the original software.
     22 # 3. This notice may not be removed or altered from any source distribution.
     23 
     24 import datetime
     25 import unittest
     26 import sqlite3 as sqlite
     27 from test import test_support
     28 try:
     29     import zlib
     30 except ImportError:
     31     zlib = None
     32 
     33 
     34 class SqliteTypeTests(unittest.TestCase):
     35     def setUp(self):
     36         self.con = sqlite.connect(":memory:")
     37         self.cur = self.con.cursor()
     38         self.cur.execute("create table test(i integer, s varchar, f number, b blob)")
     39 
     40     def tearDown(self):
     41         self.cur.close()
     42         self.con.close()
     43 
     44     def CheckString(self):
     45         self.cur.execute("insert into test(s) values (?)", (u"sterreich",))
     46         self.cur.execute("select s from test")
     47         row = self.cur.fetchone()
     48         self.assertEqual(row[0], u"sterreich")
     49 
     50     def CheckSmallInt(self):
     51         self.cur.execute("insert into test(i) values (?)", (42,))
     52         self.cur.execute("select i from test")
     53         row = self.cur.fetchone()
     54         self.assertEqual(row[0], 42)
     55 
     56     def CheckLargeInt(self):
     57         num = 2**40
     58         self.cur.execute("insert into test(i) values (?)", (num,))
     59         self.cur.execute("select i from test")
     60         row = self.cur.fetchone()
     61         self.assertEqual(row[0], num)
     62 
     63     def CheckFloat(self):
     64         val = 3.14
     65         self.cur.execute("insert into test(f) values (?)", (val,))
     66         self.cur.execute("select f from test")
     67         row = self.cur.fetchone()
     68         self.assertEqual(row[0], val)
     69 
     70     def CheckBlob(self):
     71         with test_support.check_py3k_warnings():
     72             val = buffer("Guglhupf")
     73         self.cur.execute("insert into test(b) values (?)", (val,))
     74         self.cur.execute("select b from test")
     75         row = self.cur.fetchone()
     76         self.assertEqual(row[0], val)
     77 
     78     def CheckUnicodeExecute(self):
     79         self.cur.execute(u"select 'sterreich'")
     80         row = self.cur.fetchone()
     81         self.assertEqual(row[0], u"sterreich")
     82 
     83     def CheckNonUtf8_Default(self):
     84         try:
     85             self.cur.execute("select ?", (chr(150),))
     86             self.fail("should have raised a ProgrammingError")
     87         except sqlite.ProgrammingError:
     88             pass
     89 
     90     def CheckNonUtf8_TextFactoryString(self):
     91         orig_text_factory = self.con.text_factory
     92         try:
     93             self.con.text_factory = str
     94             self.cur.execute("select ?", (chr(150),))
     95         finally:
     96             self.con.text_factory = orig_text_factory
     97 
     98     def CheckNonUtf8_TextFactoryOptimizedUnicode(self):
     99         orig_text_factory = self.con.text_factory
    100         try:
    101             try:
    102                 self.con.text_factory = sqlite.OptimizedUnicode
    103                 self.cur.execute("select ?", (chr(150),))
    104                 self.fail("should have raised a ProgrammingError")
    105             except sqlite.ProgrammingError:
    106                 pass
    107         finally:
    108             self.con.text_factory = orig_text_factory
    109 
    110 class DeclTypesTests(unittest.TestCase):
    111     class Foo:
    112         def __init__(self, _val):
    113             self.val = _val
    114 
    115         def __cmp__(self, other):
    116             if not isinstance(other, DeclTypesTests.Foo):
    117                 raise ValueError
    118             if self.val == other.val:
    119                 return 0
    120             else:
    121                 return 1
    122 
    123         def __conform__(self, protocol):
    124             if protocol is sqlite.PrepareProtocol:
    125                 return self.val
    126             else:
    127                 return None
    128 
    129         def __str__(self):
    130             return "<%s>" % self.val
    131 
    132     def setUp(self):
    133         self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
    134         self.cur = self.con.cursor()
    135         self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))")
    136 
    137         # override float, make them always return the same number
    138         sqlite.converters["FLOAT"] = lambda x: 47.2
    139 
    140         # and implement two custom ones
    141         sqlite.converters["BOOL"] = lambda x: bool(int(x))
    142         sqlite.converters["FOO"] = DeclTypesTests.Foo
    143         sqlite.converters["WRONG"] = lambda x: "WRONG"
    144         sqlite.converters["NUMBER"] = float
    145 
    146     def tearDown(self):
    147         del sqlite.converters["FLOAT"]
    148         del sqlite.converters["BOOL"]
    149         del sqlite.converters["FOO"]
    150         del sqlite.converters["NUMBER"]
    151         self.cur.close()
    152         self.con.close()
    153 
    154     def CheckString(self):
    155         # default
    156         self.cur.execute("insert into test(s) values (?)", ("foo",))
    157         self.cur.execute('select s as "s [WRONG]" from test')
    158         row = self.cur.fetchone()
    159         self.assertEqual(row[0], "foo")
    160 
    161     def CheckSmallInt(self):
    162         # default
    163         self.cur.execute("insert into test(i) values (?)", (42,))
    164         self.cur.execute("select i from test")
    165         row = self.cur.fetchone()
    166         self.assertEqual(row[0], 42)
    167 
    168     def CheckLargeInt(self):
    169         # default
    170         num = 2**40
    171         self.cur.execute("insert into test(i) values (?)", (num,))
    172         self.cur.execute("select i from test")
    173         row = self.cur.fetchone()
    174         self.assertEqual(row[0], num)
    175 
    176     def CheckFloat(self):
    177         # custom
    178         val = 3.14
    179         self.cur.execute("insert into test(f) values (?)", (val,))
    180         self.cur.execute("select f from test")
    181         row = self.cur.fetchone()
    182         self.assertEqual(row[0], 47.2)
    183 
    184     def CheckBool(self):
    185         # custom
    186         self.cur.execute("insert into test(b) values (?)", (False,))
    187         self.cur.execute("select b from test")
    188         row = self.cur.fetchone()
    189         self.assertEqual(row[0], False)
    190 
    191         self.cur.execute("delete from test")
    192         self.cur.execute("insert into test(b) values (?)", (True,))
    193         self.cur.execute("select b from test")
    194         row = self.cur.fetchone()
    195         self.assertEqual(row[0], True)
    196 
    197     def CheckUnicode(self):
    198         # default
    199         val = u"\xd6sterreich"
    200         self.cur.execute("insert into test(u) values (?)", (val,))
    201         self.cur.execute("select u from test")
    202         row = self.cur.fetchone()
    203         self.assertEqual(row[0], val)
    204 
    205     def CheckFoo(self):
    206         val = DeclTypesTests.Foo("bla")
    207         self.cur.execute("insert into test(foo) values (?)", (val,))
    208         self.cur.execute("select foo from test")
    209         row = self.cur.fetchone()
    210         self.assertEqual(row[0], val)
    211 
    212     def CheckUnsupportedSeq(self):
    213         class Bar: pass
    214         val = Bar()
    215         try:
    216             self.cur.execute("insert into test(f) values (?)", (val,))
    217             self.fail("should have raised an InterfaceError")
    218         except sqlite.InterfaceError:
    219             pass
    220         except:
    221             self.fail("should have raised an InterfaceError")
    222 
    223     def CheckUnsupportedDict(self):
    224         class Bar: pass
    225         val = Bar()
    226         try:
    227             self.cur.execute("insert into test(f) values (:val)", {"val": val})
    228             self.fail("should have raised an InterfaceError")
    229         except sqlite.InterfaceError:
    230             pass
    231         except:
    232             self.fail("should have raised an InterfaceError")
    233 
    234     def CheckBlob(self):
    235         # default
    236         with test_support.check_py3k_warnings():
    237             val = buffer("Guglhupf")
    238         self.cur.execute("insert into test(bin) values (?)", (val,))
    239         self.cur.execute("select bin from test")
    240         row = self.cur.fetchone()
    241         self.assertEqual(row[0], val)
    242 
    243     def CheckNumber1(self):
    244         self.cur.execute("insert into test(n1) values (5)")
    245         value = self.cur.execute("select n1 from test").fetchone()[0]
    246         # if the converter is not used, it's an int instead of a float
    247         self.assertEqual(type(value), float)
    248 
    249     def CheckNumber2(self):
    250         """Checks whether converter names are cut off at '(' characters"""
    251         self.cur.execute("insert into test(n2) values (5)")
    252         value = self.cur.execute("select n2 from test").fetchone()[0]
    253         # if the converter is not used, it's an int instead of a float
    254         self.assertEqual(type(value), float)
    255 
    256 class ColNamesTests(unittest.TestCase):
    257     def setUp(self):
    258         self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
    259         self.cur = self.con.cursor()
    260         self.cur.execute("create table test(x foo)")
    261 
    262         sqlite.converters["FOO"] = lambda x: "[%s]" % x
    263         sqlite.converters["BAR"] = lambda x: "<%s>" % x
    264         sqlite.converters["EXC"] = lambda x: 5 // 0
    265         sqlite.converters["B1B1"] = lambda x: "MARKER"
    266 
    267     def tearDown(self):
    268         del sqlite.converters["FOO"]
    269         del sqlite.converters["BAR"]
    270         del sqlite.converters["EXC"]
    271         del sqlite.converters["B1B1"]
    272         self.cur.close()
    273         self.con.close()
    274 
    275     def CheckDeclTypeNotUsed(self):
    276         """
    277         Assures that the declared type is not used when PARSE_DECLTYPES
    278         is not set.
    279         """
    280         self.cur.execute("insert into test(x) values (?)", ("xxx",))
    281         self.cur.execute("select x from test")
    282         val = self.cur.fetchone()[0]
    283         self.assertEqual(val, "xxx")
    284 
    285     def CheckNone(self):
    286         self.cur.execute("insert into test(x) values (?)", (None,))
    287         self.cur.execute("select x from test")
    288         val = self.cur.fetchone()[0]
    289         self.assertEqual(val, None)
    290 
    291     def CheckColName(self):
    292         self.cur.execute("insert into test(x) values (?)", ("xxx",))
    293         self.cur.execute('select x as "x [bar]" from test')
    294         val = self.cur.fetchone()[0]
    295         self.assertEqual(val, "<xxx>")
    296 
    297         # Check if the stripping of colnames works. Everything after the first
    298         # whitespace should be stripped.
    299         self.assertEqual(self.cur.description[0][0], "x")
    300 
    301     def CheckCaseInConverterName(self):
    302         self.cur.execute("""select 'other' as "x [b1b1]\"""")
    303         val = self.cur.fetchone()[0]
    304         self.assertEqual(val, "MARKER")
    305 
    306     def CheckCursorDescriptionNoRow(self):
    307         """
    308         cursor.description should at least provide the column name(s), even if
    309         no row returned.
    310         """
    311         self.cur.execute("select * from test where 0 = 1")
    312         self.assertEqual(self.cur.description[0][0], "x")
    313 
    314 class ObjectAdaptationTests(unittest.TestCase):
    315     def cast(obj):
    316         return float(obj)
    317     cast = staticmethod(cast)
    318 
    319     def setUp(self):
    320         self.con = sqlite.connect(":memory:")
    321         try:
    322             del sqlite.adapters[int]
    323         except:
    324             pass
    325         sqlite.register_adapter(int, ObjectAdaptationTests.cast)
    326         self.cur = self.con.cursor()
    327 
    328     def tearDown(self):
    329         del sqlite.adapters[(int, sqlite.PrepareProtocol)]
    330         self.cur.close()
    331         self.con.close()
    332 
    333     def CheckCasterIsUsed(self):
    334         self.cur.execute("select ?", (4,))
    335         val = self.cur.fetchone()[0]
    336         self.assertEqual(type(val), float)
    337 
    338 @unittest.skipUnless(zlib, "requires zlib")
    339 class BinaryConverterTests(unittest.TestCase):
    340     def convert(s):
    341         return zlib.decompress(s)
    342     convert = staticmethod(convert)
    343 
    344     def setUp(self):
    345         self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
    346         sqlite.register_converter("bin", BinaryConverterTests.convert)
    347 
    348     def tearDown(self):
    349         self.con.close()
    350 
    351     def CheckBinaryInputForConverter(self):
    352         testdata = "abcdefg" * 10
    353         with test_support.check_py3k_warnings():
    354             result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0]
    355         self.assertEqual(testdata, result)
    356 
    357 class DateTimeTests(unittest.TestCase):
    358     def setUp(self):
    359         self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
    360         self.cur = self.con.cursor()
    361         self.cur.execute("create table test(d date, ts timestamp)")
    362 
    363     def tearDown(self):
    364         self.cur.close()
    365         self.con.close()
    366 
    367     def CheckSqliteDate(self):
    368         d = sqlite.Date(2004, 2, 14)
    369         self.cur.execute("insert into test(d) values (?)", (d,))
    370         self.cur.execute("select d from test")
    371         d2 = self.cur.fetchone()[0]
    372         self.assertEqual(d, d2)
    373 
    374     def CheckSqliteTimestamp(self):
    375         ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0)
    376         self.cur.execute("insert into test(ts) values (?)", (ts,))
    377         self.cur.execute("select ts from test")
    378         ts2 = self.cur.fetchone()[0]
    379         self.assertEqual(ts, ts2)
    380 
    381     def CheckSqlTimestamp(self):
    382         # The date functions are only available in SQLite version 3.1 or later
    383         if sqlite.sqlite_version_info < (3, 1):
    384             return
    385 
    386         # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time.
    387         now = datetime.datetime.now()
    388         self.cur.execute("insert into test(ts) values (current_timestamp)")
    389         self.cur.execute("select ts from test")
    390         ts = self.cur.fetchone()[0]
    391         self.assertEqual(type(ts), datetime.datetime)
    392         self.assertEqual(ts.year, now.year)
    393 
    394     def CheckDateTimeSubSeconds(self):
    395         ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
    396         self.cur.execute("insert into test(ts) values (?)", (ts,))
    397         self.cur.execute("select ts from test")
    398         ts2 = self.cur.fetchone()[0]
    399         self.assertEqual(ts, ts2)
    400 
    401     def CheckDateTimeSubSecondsFloatingPoint(self):
    402         ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241)
    403         self.cur.execute("insert into test(ts) values (?)", (ts,))
    404         self.cur.execute("select ts from test")
    405         ts2 = self.cur.fetchone()[0]
    406         self.assertEqual(ts, ts2)
    407 
    408 def suite():
    409     sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check")
    410     decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check")
    411     colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check")
    412     adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check")
    413     bin_suite = unittest.makeSuite(BinaryConverterTests, "Check")
    414     date_suite = unittest.makeSuite(DateTimeTests, "Check")
    415     return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite))
    416 
    417 def test():
    418     runner = unittest.TextTestRunner()
    419     runner.run(suite())
    420 
    421 if __name__ == "__main__":
    422     test()
    423