Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env python
      2 #
      3 #-----------------------------------------------------------------------
      4 # A test suite for the table interface built on bsddb.db
      5 #-----------------------------------------------------------------------
      6 #
      7 # Copyright (C) 2000, 2001 by Autonomous Zone Industries
      8 # Copyright (C) 2002 Gregory P. Smith
      9 #
     10 # March 20, 2000
     11 #
     12 # License:      This is free software.  You may use this software for any
     13 #               purpose including modification/redistribution, so long as
     14 #               this header remains intact and that you do not claim any
     15 #               rights of ownership or authorship of this software.  This
     16 #               software has been tested, but no warranty is expressed or
     17 #               implied.
     18 #
     19 #   --  Gregory P. Smith <greg (at] krypto.org>
     20 #
     21 # $Id$
     22 
     23 import os, re, sys
     24 
     25 if sys.version_info[0] < 3 :
     26     try:
     27         import cPickle
     28         pickle = cPickle
     29     except ImportError:
     30         import pickle
     31 else :
     32     import pickle
     33 
     34 import unittest
     35 from test_all import db, dbtables, test_support, verbose, \
     36         get_new_environment_path, get_new_database_path
     37 
     38 #----------------------------------------------------------------------
     39 
     40 class TableDBTestCase(unittest.TestCase):
     41     db_name = 'test-table.db'
     42 
     43     def setUp(self):
     44         import sys
     45         if sys.version_info[0] >= 3 :
     46             from test_all import do_proxy_db_py3k
     47             self._flag_proxy_db_py3k = do_proxy_db_py3k(False)
     48 
     49         self.testHomeDir = get_new_environment_path()
     50         self.tdb = dbtables.bsdTableDB(
     51             filename='tabletest.db', dbhome=self.testHomeDir, create=1)
     52 
     53     def tearDown(self):
     54         self.tdb.close()
     55         import sys
     56         if sys.version_info[0] >= 3 :
     57             from test_all import do_proxy_db_py3k
     58             do_proxy_db_py3k(self._flag_proxy_db_py3k)
     59         test_support.rmtree(self.testHomeDir)
     60 
     61     def test01(self):
     62         tabname = "test01"
     63         colname = 'cool numbers'
     64         try:
     65             self.tdb.Drop(tabname)
     66         except dbtables.TableDBError:
     67             pass
     68         self.tdb.CreateTable(tabname, [colname])
     69         import sys
     70         if sys.version_info[0] < 3 :
     71             self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 1)})
     72         else :
     73             self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159,
     74                 1).decode("iso8859-1")})  # 8 bits
     75 
     76         if verbose:
     77             self.tdb._db_print()
     78 
     79         values = self.tdb.Select(
     80             tabname, [colname], conditions={colname: None})
     81 
     82         import sys
     83         if sys.version_info[0] < 3 :
     84             colval = pickle.loads(values[0][colname])
     85         else :
     86             colval = pickle.loads(bytes(values[0][colname], "iso8859-1"))
     87         self.assertTrue(colval > 3.141)
     88         self.assertTrue(colval < 3.142)
     89 
     90 
     91     def test02(self):
     92         tabname = "test02"
     93         col0 = 'coolness factor'
     94         col1 = 'but can it fly?'
     95         col2 = 'Species'
     96 
     97         import sys
     98         if sys.version_info[0] < 3 :
     99             testinfo = [
    100                 {col0: pickle.dumps(8, 1), col1: 'no', col2: 'Penguin'},
    101                 {col0: pickle.dumps(-1, 1), col1: 'no', col2: 'Turkey'},
    102                 {col0: pickle.dumps(9, 1), col1: 'yes', col2: 'SR-71A Blackbird'}
    103             ]
    104         else :
    105             testinfo = [
    106                 {col0: pickle.dumps(8, 1).decode("iso8859-1"),
    107                     col1: 'no', col2: 'Penguin'},
    108                 {col0: pickle.dumps(-1, 1).decode("iso8859-1"),
    109                     col1: 'no', col2: 'Turkey'},
    110                 {col0: pickle.dumps(9, 1).decode("iso8859-1"),
    111                     col1: 'yes', col2: 'SR-71A Blackbird'}
    112             ]
    113 
    114         try:
    115             self.tdb.Drop(tabname)
    116         except dbtables.TableDBError:
    117             pass
    118         self.tdb.CreateTable(tabname, [col0, col1, col2])
    119         for row in testinfo :
    120             self.tdb.Insert(tabname, row)
    121 
    122         import sys
    123         if sys.version_info[0] < 3 :
    124             values = self.tdb.Select(tabname, [col2],
    125                 conditions={col0: lambda x: pickle.loads(x) >= 8})
    126         else :
    127             values = self.tdb.Select(tabname, [col2],
    128                 conditions={col0: lambda x:
    129                     pickle.loads(bytes(x, "iso8859-1")) >= 8})
    130 
    131         self.assertEqual(len(values), 2)
    132         if values[0]['Species'] == 'Penguin' :
    133             self.assertEqual(values[1]['Species'], 'SR-71A Blackbird')
    134         elif values[0]['Species'] == 'SR-71A Blackbird' :
    135             self.assertEqual(values[1]['Species'], 'Penguin')
    136         else :
    137             if verbose:
    138                 print "values= %r" % (values,)
    139             raise RuntimeError("Wrong values returned!")
    140 
    141     def test03(self):
    142         tabname = "test03"
    143         try:
    144             self.tdb.Drop(tabname)
    145         except dbtables.TableDBError:
    146             pass
    147         if verbose:
    148             print '...before CreateTable...'
    149             self.tdb._db_print()
    150         self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
    151         if verbose:
    152             print '...after CreateTable...'
    153             self.tdb._db_print()
    154         self.tdb.Drop(tabname)
    155         if verbose:
    156             print '...after Drop...'
    157             self.tdb._db_print()
    158         self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
    159 
    160         try:
    161             self.tdb.Insert(tabname,
    162                             {'a': "",
    163                              'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
    164                              'f': "Zero"})
    165             self.fail('Expected an exception')
    166         except dbtables.TableDBError:
    167             pass
    168 
    169         try:
    170             self.tdb.Select(tabname, [], conditions={'foo': '123'})
    171             self.fail('Expected an exception')
    172         except dbtables.TableDBError:
    173             pass
    174 
    175         self.tdb.Insert(tabname,
    176                         {'a': '42',
    177                          'b': "bad",
    178                          'c': "meep",
    179                          'e': 'Fuzzy wuzzy was a bear'})
    180         self.tdb.Insert(tabname,
    181                         {'a': '581750',
    182                          'b': "good",
    183                          'd': "bla",
    184                          'c': "black",
    185                          'e': 'fuzzy was here'})
    186         self.tdb.Insert(tabname,
    187                         {'a': '800000',
    188                          'b': "good",
    189                          'd': "bla",
    190                          'c': "black",
    191                          'e': 'Fuzzy wuzzy is a bear'})
    192 
    193         if verbose:
    194             self.tdb._db_print()
    195 
    196         # this should return two rows
    197         values = self.tdb.Select(tabname, ['b', 'a', 'd'],
    198             conditions={'e': re.compile('wuzzy').search,
    199                         'a': re.compile('^[0-9]+$').match})
    200         self.assertEqual(len(values), 2)
    201 
    202         # now lets delete one of them and try again
    203         self.tdb.Delete(tabname, conditions={'b': dbtables.ExactCond('good')})
    204         values = self.tdb.Select(
    205             tabname, ['a', 'd', 'b'],
    206             conditions={'e': dbtables.PrefixCond('Fuzzy')})
    207         self.assertEqual(len(values), 1)
    208         self.assertEqual(values[0]['d'], None)
    209 
    210         values = self.tdb.Select(tabname, ['b'],
    211             conditions={'c': lambda c: c == 'meep'})
    212         self.assertEqual(len(values), 1)
    213         self.assertEqual(values[0]['b'], "bad")
    214 
    215 
    216     def test04_MultiCondSelect(self):
    217         tabname = "test04_MultiCondSelect"
    218         try:
    219             self.tdb.Drop(tabname)
    220         except dbtables.TableDBError:
    221             pass
    222         self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
    223 
    224         try:
    225             self.tdb.Insert(tabname,
    226                             {'a': "",
    227                              'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
    228                              'f': "Zero"})
    229             self.fail('Expected an exception')
    230         except dbtables.TableDBError:
    231             pass
    232 
    233         self.tdb.Insert(tabname, {'a': "A", 'b': "B", 'c': "C", 'd': "D",
    234                                   'e': "E"})
    235         self.tdb.Insert(tabname, {'a': "-A", 'b': "-B", 'c': "-C", 'd': "-D",
    236                                   'e': "-E"})
    237         self.tdb.Insert(tabname, {'a': "A-", 'b': "B-", 'c': "C-", 'd': "D-",
    238                                   'e': "E-"})
    239 
    240         if verbose:
    241             self.tdb._db_print()
    242 
    243         # This select should return 0 rows.  it is designed to test
    244         # the bug identified and fixed in sourceforge bug # 590449
    245         # (Big Thanks to "Rob Tillotson (n9mtb)" for tracking this down
    246         # and supplying a fix!!  This one caused many headaches to say
    247         # the least...)
    248         values = self.tdb.Select(tabname, ['b', 'a', 'd'],
    249             conditions={'e': dbtables.ExactCond('E'),
    250                         'a': dbtables.ExactCond('A'),
    251                         'd': dbtables.PrefixCond('-')
    252                        } )
    253         self.assertEqual(len(values), 0, values)
    254 
    255 
    256     def test_CreateOrExtend(self):
    257         tabname = "test_CreateOrExtend"
    258 
    259         self.tdb.CreateOrExtendTable(
    260             tabname, ['name', 'taste', 'filling', 'alcohol content', 'price'])
    261         try:
    262             self.tdb.Insert(tabname,
    263                             {'taste': 'crap',
    264                              'filling': 'no',
    265                              'is it Guinness?': 'no'})
    266             self.fail("Insert should've failed due to bad column name")
    267         except:
    268             pass
    269         self.tdb.CreateOrExtendTable(tabname,
    270                                      ['name', 'taste', 'is it Guinness?'])
    271 
    272         # these should both succeed as the table should contain the union of both sets of columns.
    273         self.tdb.Insert(tabname, {'taste': 'crap', 'filling': 'no',
    274                                   'is it Guinness?': 'no'})
    275         self.tdb.Insert(tabname, {'taste': 'great', 'filling': 'yes',
    276                                   'is it Guinness?': 'yes',
    277                                   'name': 'Guinness'})
    278 
    279 
    280     def test_CondObjs(self):
    281         tabname = "test_CondObjs"
    282 
    283         self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e', 'p'])
    284 
    285         self.tdb.Insert(tabname, {'a': "the letter A",
    286                                   'b': "the letter B",
    287                                   'c': "is for cookie"})
    288         self.tdb.Insert(tabname, {'a': "is for aardvark",
    289                                   'e': "the letter E",
    290                                   'c': "is for cookie",
    291                                   'd': "is for dog"})
    292         self.tdb.Insert(tabname, {'a': "the letter A",
    293                                   'e': "the letter E",
    294                                   'c': "is for cookie",
    295                                   'p': "is for Python"})
    296 
    297         values = self.tdb.Select(
    298             tabname, ['p', 'e'],
    299             conditions={'e': dbtables.PrefixCond('the l')})
    300         self.assertEqual(len(values), 2, values)
    301         self.assertEqual(values[0]['e'], values[1]['e'], values)
    302         self.assertNotEqual(values[0]['p'], values[1]['p'], values)
    303 
    304         values = self.tdb.Select(
    305             tabname, ['d', 'a'],
    306             conditions={'a': dbtables.LikeCond('%aardvark%')})
    307         self.assertEqual(len(values), 1, values)
    308         self.assertEqual(values[0]['d'], "is for dog", values)
    309         self.assertEqual(values[0]['a'], "is for aardvark", values)
    310 
    311         values = self.tdb.Select(tabname, None,
    312                                  {'b': dbtables.Cond(),
    313                                   'e':dbtables.LikeCond('%letter%'),
    314                                   'a':dbtables.PrefixCond('is'),
    315                                   'd':dbtables.ExactCond('is for dog'),
    316                                   'c':dbtables.PrefixCond('is for'),
    317                                   'p':lambda s: not s})
    318         self.assertEqual(len(values), 1, values)
    319         self.assertEqual(values[0]['d'], "is for dog", values)
    320         self.assertEqual(values[0]['a'], "is for aardvark", values)
    321 
    322     def test_Delete(self):
    323         tabname = "test_Delete"
    324         self.tdb.CreateTable(tabname, ['x', 'y', 'z'])
    325 
    326         # prior to 2001-05-09 there was a bug where Delete() would
    327         # fail if it encountered any rows that did not have values in
    328         # every column.
    329         # Hunted and Squashed by <Donwulff> (Jukka Santala - donwulff (at] nic.fi)
    330         self.tdb.Insert(tabname, {'x': 'X1', 'y':'Y1'})
    331         self.tdb.Insert(tabname, {'x': 'X2', 'y':'Y2', 'z': 'Z2'})
    332 
    333         self.tdb.Delete(tabname, conditions={'x': dbtables.PrefixCond('X')})
    334         values = self.tdb.Select(tabname, ['y'],
    335                                  conditions={'x': dbtables.PrefixCond('X')})
    336         self.assertEqual(len(values), 0)
    337 
    338     def test_Modify(self):
    339         tabname = "test_Modify"
    340         self.tdb.CreateTable(tabname, ['Name', 'Type', 'Access'])
    341 
    342         self.tdb.Insert(tabname, {'Name': 'Index to MP3 files.doc',
    343                                   'Type': 'Word', 'Access': '8'})
    344         self.tdb.Insert(tabname, {'Name': 'Nifty.MP3', 'Access': '1'})
    345         self.tdb.Insert(tabname, {'Type': 'Unknown', 'Access': '0'})
    346 
    347         def set_type(type):
    348             if type is None:
    349                 return 'MP3'
    350             return type
    351 
    352         def increment_access(count):
    353             return str(int(count)+1)
    354 
    355         def remove_value(value):
    356             return None
    357 
    358         self.tdb.Modify(tabname,
    359                         conditions={'Access': dbtables.ExactCond('0')},
    360                         mappings={'Access': remove_value})
    361         self.tdb.Modify(tabname,
    362                         conditions={'Name': dbtables.LikeCond('%MP3%')},
    363                         mappings={'Type': set_type})
    364         self.tdb.Modify(tabname,
    365                         conditions={'Name': dbtables.LikeCond('%')},
    366                         mappings={'Access': increment_access})
    367 
    368         try:
    369             self.tdb.Modify(tabname,
    370                             conditions={'Name': dbtables.LikeCond('%')},
    371                             mappings={'Access': 'What is your quest?'})
    372         except TypeError:
    373             # success, the string value in mappings isn't callable
    374             pass
    375         else:
    376             raise RuntimeError, "why was TypeError not raised for bad callable?"
    377 
    378         # Delete key in select conditions
    379         values = self.tdb.Select(
    380             tabname, None,
    381             conditions={'Type': dbtables.ExactCond('Unknown')})
    382         self.assertEqual(len(values), 1, values)
    383         self.assertEqual(values[0]['Name'], None, values)
    384         self.assertEqual(values[0]['Access'], None, values)
    385 
    386         # Modify value by select conditions
    387         values = self.tdb.Select(
    388             tabname, None,
    389             conditions={'Name': dbtables.ExactCond('Nifty.MP3')})
    390         self.assertEqual(len(values), 1, values)
    391         self.assertEqual(values[0]['Type'], "MP3", values)
    392         self.assertEqual(values[0]['Access'], "2", values)
    393 
    394         # Make sure change applied only to select conditions
    395         values = self.tdb.Select(
    396             tabname, None, conditions={'Name': dbtables.LikeCond('%doc%')})
    397         self.assertEqual(len(values), 1, values)
    398         self.assertEqual(values[0]['Type'], "Word", values)
    399         self.assertEqual(values[0]['Access'], "9", values)
    400 
    401 
    402 def test_suite():
    403     suite = unittest.TestSuite()
    404     suite.addTest(unittest.makeSuite(TableDBTestCase))
    405     return suite
    406 
    407 
    408 if __name__ == '__main__':
    409     unittest.main(defaultTest='test_suite')
    410