Home | History | Annotate | Download | only in test
      1 #-----------------------------------------------------------------------
      2 # A test suite for the table interface built on bsddb.db
      3 #-----------------------------------------------------------------------
      4 #
      5 # Copyright (C) 2000, 2001 by Autonomous Zone Industries
      6 # Copyright (C) 2002 Gregory P. Smith
      7 #
      8 # March 20, 2000
      9 #
     10 # License:      This is free software.  You may use this software for any
     11 #               purpose including modification/redistribution, so long as
     12 #               this header remains intact and that you do not claim any
     13 #               rights of ownership or authorship of this software.  This
     14 #               software has been tested, but no warranty is expressed or
     15 #               implied.
     16 #
     17 #   --  Gregory P. Smith <greg (at] krypto.org>
     18 #
     19 # $Id$
     20 
     21 import os, re, sys
     22 
     23 if sys.version_info[0] < 3 :
     24     try:
     25         import cPickle
     26         pickle = cPickle
     27     except ImportError:
     28         import pickle
     29 else :
     30     import pickle
     31 
     32 import unittest
     33 from test_all import db, dbtables, test_support, verbose, \
     34         get_new_environment_path, get_new_database_path
     35 
     36 #----------------------------------------------------------------------
     37 
     38 class TableDBTestCase(unittest.TestCase):
     39     db_name = 'test-table.db'
     40 
     41     def setUp(self):
     42         import sys
     43         if sys.version_info[0] >= 3 :
     44             from test_all import do_proxy_db_py3k
     45             self._flag_proxy_db_py3k = do_proxy_db_py3k(False)
     46 
     47         self.testHomeDir = get_new_environment_path()
     48         self.tdb = dbtables.bsdTableDB(
     49             filename='tabletest.db', dbhome=self.testHomeDir, create=1)
     50 
     51     def tearDown(self):
     52         self.tdb.close()
     53         import sys
     54         if sys.version_info[0] >= 3 :
     55             from test_all import do_proxy_db_py3k
     56             do_proxy_db_py3k(self._flag_proxy_db_py3k)
     57         test_support.rmtree(self.testHomeDir)
     58 
     59     def test01(self):
     60         tabname = "test01"
     61         colname = 'cool numbers'
     62         try:
     63             self.tdb.Drop(tabname)
     64         except dbtables.TableDBError:
     65             pass
     66         self.tdb.CreateTable(tabname, [colname])
     67         import sys
     68         if sys.version_info[0] < 3 :
     69             self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 1)})
     70         else :
     71             self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159,
     72                 1).decode("iso8859-1")})  # 8 bits
     73 
     74         if verbose:
     75             self.tdb._db_print()
     76 
     77         values = self.tdb.Select(
     78             tabname, [colname], conditions={colname: None})
     79 
     80         import sys
     81         if sys.version_info[0] < 3 :
     82             colval = pickle.loads(values[0][colname])
     83         else :
     84             colval = pickle.loads(bytes(values[0][colname], "iso8859-1"))
     85         self.assertTrue(colval > 3.141)
     86         self.assertTrue(colval < 3.142)
     87 
     88 
     89     def test02(self):
     90         tabname = "test02"
     91         col0 = 'coolness factor'
     92         col1 = 'but can it fly?'
     93         col2 = 'Species'
     94 
     95         import sys
     96         if sys.version_info[0] < 3 :
     97             testinfo = [
     98                 {col0: pickle.dumps(8, 1), col1: 'no', col2: 'Penguin'},
     99                 {col0: pickle.dumps(-1, 1), col1: 'no', col2: 'Turkey'},
    100                 {col0: pickle.dumps(9, 1), col1: 'yes', col2: 'SR-71A Blackbird'}
    101             ]
    102         else :
    103             testinfo = [
    104                 {col0: pickle.dumps(8, 1).decode("iso8859-1"),
    105                     col1: 'no', col2: 'Penguin'},
    106                 {col0: pickle.dumps(-1, 1).decode("iso8859-1"),
    107                     col1: 'no', col2: 'Turkey'},
    108                 {col0: pickle.dumps(9, 1).decode("iso8859-1"),
    109                     col1: 'yes', col2: 'SR-71A Blackbird'}
    110             ]
    111 
    112         try:
    113             self.tdb.Drop(tabname)
    114         except dbtables.TableDBError:
    115             pass
    116         self.tdb.CreateTable(tabname, [col0, col1, col2])
    117         for row in testinfo :
    118             self.tdb.Insert(tabname, row)
    119 
    120         import sys
    121         if sys.version_info[0] < 3 :
    122             values = self.tdb.Select(tabname, [col2],
    123                 conditions={col0: lambda x: pickle.loads(x) >= 8})
    124         else :
    125             values = self.tdb.Select(tabname, [col2],
    126                 conditions={col0: lambda x:
    127                     pickle.loads(bytes(x, "iso8859-1")) >= 8})
    128 
    129         self.assertEqual(len(values), 2)
    130         if values[0]['Species'] == 'Penguin' :
    131             self.assertEqual(values[1]['Species'], 'SR-71A Blackbird')
    132         elif values[0]['Species'] == 'SR-71A Blackbird' :
    133             self.assertEqual(values[1]['Species'], 'Penguin')
    134         else :
    135             if verbose:
    136                 print "values= %r" % (values,)
    137             raise RuntimeError("Wrong values returned!")
    138 
    139     def test03(self):
    140         tabname = "test03"
    141         try:
    142             self.tdb.Drop(tabname)
    143         except dbtables.TableDBError:
    144             pass
    145         if verbose:
    146             print '...before CreateTable...'
    147             self.tdb._db_print()
    148         self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
    149         if verbose:
    150             print '...after CreateTable...'
    151             self.tdb._db_print()
    152         self.tdb.Drop(tabname)
    153         if verbose:
    154             print '...after Drop...'
    155             self.tdb._db_print()
    156         self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
    157 
    158         try:
    159             self.tdb.Insert(tabname,
    160                             {'a': "",
    161                              'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
    162                              'f': "Zero"})
    163             self.fail('Expected an exception')
    164         except dbtables.TableDBError:
    165             pass
    166 
    167         try:
    168             self.tdb.Select(tabname, [], conditions={'foo': '123'})
    169             self.fail('Expected an exception')
    170         except dbtables.TableDBError:
    171             pass
    172 
    173         self.tdb.Insert(tabname,
    174                         {'a': '42',
    175                          'b': "bad",
    176                          'c': "meep",
    177                          'e': 'Fuzzy wuzzy was a bear'})
    178         self.tdb.Insert(tabname,
    179                         {'a': '581750',
    180                          'b': "good",
    181                          'd': "bla",
    182                          'c': "black",
    183                          'e': 'fuzzy was here'})
    184         self.tdb.Insert(tabname,
    185                         {'a': '800000',
    186                          'b': "good",
    187                          'd': "bla",
    188                          'c': "black",
    189                          'e': 'Fuzzy wuzzy is a bear'})
    190 
    191         if verbose:
    192             self.tdb._db_print()
    193 
    194         # this should return two rows
    195         values = self.tdb.Select(tabname, ['b', 'a', 'd'],
    196             conditions={'e': re.compile('wuzzy').search,
    197                         'a': re.compile('^[0-9]+$').match})
    198         self.assertEqual(len(values), 2)
    199 
    200         # now lets delete one of them and try again
    201         self.tdb.Delete(tabname, conditions={'b': dbtables.ExactCond('good')})
    202         values = self.tdb.Select(
    203             tabname, ['a', 'd', 'b'],
    204             conditions={'e': dbtables.PrefixCond('Fuzzy')})
    205         self.assertEqual(len(values), 1)
    206         self.assertEqual(values[0]['d'], None)
    207 
    208         values = self.tdb.Select(tabname, ['b'],
    209             conditions={'c': lambda c: c == 'meep'})
    210         self.assertEqual(len(values), 1)
    211         self.assertEqual(values[0]['b'], "bad")
    212 
    213 
    214     def test04_MultiCondSelect(self):
    215         tabname = "test04_MultiCondSelect"
    216         try:
    217             self.tdb.Drop(tabname)
    218         except dbtables.TableDBError:
    219             pass
    220         self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
    221 
    222         try:
    223             self.tdb.Insert(tabname,
    224                             {'a': "",
    225                              'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
    226                              'f': "Zero"})
    227             self.fail('Expected an exception')
    228         except dbtables.TableDBError:
    229             pass
    230 
    231         self.tdb.Insert(tabname, {'a': "A", 'b': "B", 'c': "C", 'd': "D",
    232                                   'e': "E"})
    233         self.tdb.Insert(tabname, {'a': "-A", 'b': "-B", 'c': "-C", 'd': "-D",
    234                                   'e': "-E"})
    235         self.tdb.Insert(tabname, {'a': "A-", 'b': "B-", 'c': "C-", 'd': "D-",
    236                                   'e': "E-"})
    237 
    238         if verbose:
    239             self.tdb._db_print()
    240 
    241         # This select should return 0 rows.  it is designed to test
    242         # the bug identified and fixed in sourceforge bug # 590449
    243         # (Big Thanks to "Rob Tillotson (n9mtb)" for tracking this down
    244         # and supplying a fix!!  This one caused many headaches to say
    245         # the least...)
    246         values = self.tdb.Select(tabname, ['b', 'a', 'd'],
    247             conditions={'e': dbtables.ExactCond('E'),
    248                         'a': dbtables.ExactCond('A'),
    249                         'd': dbtables.PrefixCond('-')
    250                        } )
    251         self.assertEqual(len(values), 0, values)
    252 
    253 
    254     def test_CreateOrExtend(self):
    255         tabname = "test_CreateOrExtend"
    256 
    257         self.tdb.CreateOrExtendTable(
    258             tabname, ['name', 'taste', 'filling', 'alcohol content', 'price'])
    259         try:
    260             self.tdb.Insert(tabname,
    261                             {'taste': 'crap',
    262                              'filling': 'no',
    263                              'is it Guinness?': 'no'})
    264             self.fail("Insert should've failed due to bad column name")
    265         except:
    266             pass
    267         self.tdb.CreateOrExtendTable(tabname,
    268                                      ['name', 'taste', 'is it Guinness?'])
    269 
    270         # these should both succeed as the table should contain the union of both sets of columns.
    271         self.tdb.Insert(tabname, {'taste': 'crap', 'filling': 'no',
    272                                   'is it Guinness?': 'no'})
    273         self.tdb.Insert(tabname, {'taste': 'great', 'filling': 'yes',
    274                                   'is it Guinness?': 'yes',
    275                                   'name': 'Guinness'})
    276 
    277 
    278     def test_CondObjs(self):
    279         tabname = "test_CondObjs"
    280 
    281         self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e', 'p'])
    282 
    283         self.tdb.Insert(tabname, {'a': "the letter A",
    284                                   'b': "the letter B",
    285                                   'c': "is for cookie"})
    286         self.tdb.Insert(tabname, {'a': "is for aardvark",
    287                                   'e': "the letter E",
    288                                   'c': "is for cookie",
    289                                   'd': "is for dog"})
    290         self.tdb.Insert(tabname, {'a': "the letter A",
    291                                   'e': "the letter E",
    292                                   'c': "is for cookie",
    293                                   'p': "is for Python"})
    294 
    295         values = self.tdb.Select(
    296             tabname, ['p', 'e'],
    297             conditions={'e': dbtables.PrefixCond('the l')})
    298         self.assertEqual(len(values), 2, values)
    299         self.assertEqual(values[0]['e'], values[1]['e'], values)
    300         self.assertNotEqual(values[0]['p'], values[1]['p'], values)
    301 
    302         values = self.tdb.Select(
    303             tabname, ['d', 'a'],
    304             conditions={'a': dbtables.LikeCond('%aardvark%')})
    305         self.assertEqual(len(values), 1, values)
    306         self.assertEqual(values[0]['d'], "is for dog", values)
    307         self.assertEqual(values[0]['a'], "is for aardvark", values)
    308 
    309         values = self.tdb.Select(tabname, None,
    310                                  {'b': dbtables.Cond(),
    311                                   'e':dbtables.LikeCond('%letter%'),
    312                                   'a':dbtables.PrefixCond('is'),
    313                                   'd':dbtables.ExactCond('is for dog'),
    314                                   'c':dbtables.PrefixCond('is for'),
    315                                   'p':lambda s: not s})
    316         self.assertEqual(len(values), 1, values)
    317         self.assertEqual(values[0]['d'], "is for dog", values)
    318         self.assertEqual(values[0]['a'], "is for aardvark", values)
    319 
    320     def test_Delete(self):
    321         tabname = "test_Delete"
    322         self.tdb.CreateTable(tabname, ['x', 'y', 'z'])
    323 
    324         # prior to 2001-05-09 there was a bug where Delete() would
    325         # fail if it encountered any rows that did not have values in
    326         # every column.
    327         # Hunted and Squashed by <Donwulff> (Jukka Santala - donwulff (at] nic.fi)
    328         self.tdb.Insert(tabname, {'x': 'X1', 'y':'Y1'})
    329         self.tdb.Insert(tabname, {'x': 'X2', 'y':'Y2', 'z': 'Z2'})
    330 
    331         self.tdb.Delete(tabname, conditions={'x': dbtables.PrefixCond('X')})
    332         values = self.tdb.Select(tabname, ['y'],
    333                                  conditions={'x': dbtables.PrefixCond('X')})
    334         self.assertEqual(len(values), 0)
    335 
    336     def test_Modify(self):
    337         tabname = "test_Modify"
    338         self.tdb.CreateTable(tabname, ['Name', 'Type', 'Access'])
    339 
    340         self.tdb.Insert(tabname, {'Name': 'Index to MP3 files.doc',
    341                                   'Type': 'Word', 'Access': '8'})
    342         self.tdb.Insert(tabname, {'Name': 'Nifty.MP3', 'Access': '1'})
    343         self.tdb.Insert(tabname, {'Type': 'Unknown', 'Access': '0'})
    344 
    345         def set_type(type):
    346             if type is None:
    347                 return 'MP3'
    348             return type
    349 
    350         def increment_access(count):
    351             return str(int(count)+1)
    352 
    353         def remove_value(value):
    354             return None
    355 
    356         self.tdb.Modify(tabname,
    357                         conditions={'Access': dbtables.ExactCond('0')},
    358                         mappings={'Access': remove_value})
    359         self.tdb.Modify(tabname,
    360                         conditions={'Name': dbtables.LikeCond('%MP3%')},
    361                         mappings={'Type': set_type})
    362         self.tdb.Modify(tabname,
    363                         conditions={'Name': dbtables.LikeCond('%')},
    364                         mappings={'Access': increment_access})
    365 
    366         try:
    367             self.tdb.Modify(tabname,
    368                             conditions={'Name': dbtables.LikeCond('%')},
    369                             mappings={'Access': 'What is your quest?'})
    370         except TypeError:
    371             # success, the string value in mappings isn't callable
    372             pass
    373         else:
    374             raise RuntimeError, "why was TypeError not raised for bad callable?"
    375 
    376         # Delete key in select conditions
    377         values = self.tdb.Select(
    378             tabname, None,
    379             conditions={'Type': dbtables.ExactCond('Unknown')})
    380         self.assertEqual(len(values), 1, values)
    381         self.assertEqual(values[0]['Name'], None, values)
    382         self.assertEqual(values[0]['Access'], None, values)
    383 
    384         # Modify value by select conditions
    385         values = self.tdb.Select(
    386             tabname, None,
    387             conditions={'Name': dbtables.ExactCond('Nifty.MP3')})
    388         self.assertEqual(len(values), 1, values)
    389         self.assertEqual(values[0]['Type'], "MP3", values)
    390         self.assertEqual(values[0]['Access'], "2", values)
    391 
    392         # Make sure change applied only to select conditions
    393         values = self.tdb.Select(
    394             tabname, None, conditions={'Name': dbtables.LikeCond('%doc%')})
    395         self.assertEqual(len(values), 1, values)
    396         self.assertEqual(values[0]['Type'], "Word", values)
    397         self.assertEqual(values[0]['Access'], "9", values)
    398 
    399 
    400 def test_suite():
    401     suite = unittest.TestSuite()
    402     suite.addTest(unittest.makeSuite(TableDBTestCase))
    403     return suite
    404 
    405 
    406 if __name__ == '__main__':
    407     unittest.main(defaultTest='test_suite')
    408