Home | History | Annotate | Download | only in test
      1 """TestCases for exercising a Recno DB.
      2 """
      3 
      4 import os, sys
      5 import errno
      6 from pprint import pprint
      7 import string
      8 import unittest
      9 
     10 from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path
     11 
     12 
     13 #----------------------------------------------------------------------
     14 
     15 class SimpleRecnoTestCase(unittest.TestCase):
     16     if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and
     17             (sys.version_info < (3, 2))) :
     18         def assertIsInstance(self, obj, datatype, msg=None) :
     19             return self.assertEqual(type(obj), datatype, msg=msg)
     20         def assertGreaterEqual(self, a, b, msg=None) :
     21             return self.assertTrue(a>=b, msg=msg)
     22 
     23 
     24     def setUp(self):
     25         self.filename = get_new_database_path()
     26         self.homeDir = None
     27 
     28     def tearDown(self):
     29         test_support.unlink(self.filename)
     30         if self.homeDir:
     31             test_support.rmtree(self.homeDir)
     32 
     33     def test01_basic(self):
     34         d = db.DB()
     35 
     36         get_returns_none = d.set_get_returns_none(2)
     37         d.set_get_returns_none(get_returns_none)
     38 
     39         d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
     40 
     41         for x in string.ascii_letters:
     42             recno = d.append(x * 60)
     43             self.assertIsInstance(recno, int)
     44             self.assertGreaterEqual(recno, 1)
     45             if verbose:
     46                 print recno,
     47 
     48         if verbose: print
     49 
     50         stat = d.stat()
     51         if verbose:
     52             pprint(stat)
     53 
     54         for recno in range(1, len(d)+1):
     55             data = d[recno]
     56             if verbose:
     57                 print data
     58 
     59             self.assertIsInstance(data, str)
     60             self.assertEqual(data, d.get(recno))
     61 
     62         try:
     63             data = d[0]  # This should raise a KeyError!?!?!
     64         except db.DBInvalidArgError, val:
     65             if sys.version_info < (2, 6) :
     66                 self.assertEqual(val[0], db.EINVAL)
     67             else :
     68                 self.assertEqual(val.args[0], db.EINVAL)
     69             if verbose: print val
     70         else:
     71             self.fail("expected exception")
     72 
     73         # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2)
     74         try:
     75             d.has_key(0)
     76         except db.DBError, val:
     77             pass
     78         else:
     79             self.fail("has_key did not raise a proper exception")
     80 
     81         try:
     82             data = d[100]
     83         except KeyError:
     84             pass
     85         else:
     86             self.fail("expected exception")
     87 
     88         try:
     89             data = d.get(100)
     90         except db.DBNotFoundError, val:
     91             if get_returns_none:
     92                 self.fail("unexpected exception")
     93         else:
     94             self.assertEqual(data, None)
     95 
     96         keys = d.keys()
     97         if verbose:
     98             print keys
     99         self.assertIsInstance(keys, list)
    100         self.assertIsInstance(keys[0], int)
    101         self.assertEqual(len(keys), len(d))
    102 
    103         items = d.items()
    104         if verbose:
    105             pprint(items)
    106         self.assertIsInstance(items, list)
    107         self.assertIsInstance(items[0], tuple)
    108         self.assertEqual(len(items[0]), 2)
    109         self.assertIsInstance(items[0][0], int)
    110         self.assertIsInstance(items[0][1], str)
    111         self.assertEqual(len(items), len(d))
    112 
    113         self.assertTrue(d.has_key(25))
    114 
    115         del d[25]
    116         self.assertFalse(d.has_key(25))
    117 
    118         d.delete(13)
    119         self.assertFalse(d.has_key(13))
    120 
    121         data = d.get_both(26, "z" * 60)
    122         self.assertEqual(data, "z" * 60, 'was %r' % data)
    123         if verbose:
    124             print data
    125 
    126         fd = d.fd()
    127         if verbose:
    128             print fd
    129 
    130         c = d.cursor()
    131         rec = c.first()
    132         while rec:
    133             if verbose:
    134                 print rec
    135             rec = c.next()
    136 
    137         c.set(50)
    138         rec = c.current()
    139         if verbose:
    140             print rec
    141 
    142         c.put(-1, "a replacement record", db.DB_CURRENT)
    143 
    144         c.set(50)
    145         rec = c.current()
    146         self.assertEqual(rec, (50, "a replacement record"))
    147         if verbose:
    148             print rec
    149 
    150         rec = c.set_range(30)
    151         if verbose:
    152             print rec
    153 
    154         # test that non-existent key lookups work (and that
    155         # DBC_set_range doesn't have a memleak under valgrind)
    156         rec = c.set_range(999999)
    157         self.assertEqual(rec, None)
    158         if verbose:
    159             print rec
    160 
    161         c.close()
    162         d.close()
    163 
    164         d = db.DB()
    165         d.open(self.filename)
    166         c = d.cursor()
    167 
    168         # put a record beyond the consecutive end of the recno's
    169         d[100] = "way out there"
    170         self.assertEqual(d[100], "way out there")
    171 
    172         try:
    173             data = d[99]
    174         except KeyError:
    175             pass
    176         else:
    177             self.fail("expected exception")
    178 
    179         try:
    180             d.get(99)
    181         except db.DBKeyEmptyError, val:
    182             if get_returns_none:
    183                 self.fail("unexpected DBKeyEmptyError exception")
    184             else:
    185                 if sys.version_info < (2, 6) :
    186                     self.assertEqual(val[0], db.DB_KEYEMPTY)
    187                 else :
    188                     self.assertEqual(val.args[0], db.DB_KEYEMPTY)
    189                 if verbose: print val
    190         else:
    191             if not get_returns_none:
    192                 self.fail("expected exception")
    193 
    194         rec = c.set(40)
    195         while rec:
    196             if verbose:
    197                 print rec
    198             rec = c.next()
    199 
    200         c.close()
    201         d.close()
    202 
    203     def test02_WithSource(self):
    204         """
    205         A Recno file that is given a "backing source file" is essentially a
    206         simple ASCII file.  Normally each record is delimited by \n and so is
    207         just a line in the file, but you can set a different record delimiter
    208         if needed.
    209         """
    210         homeDir = get_new_environment_path()
    211         self.homeDir = homeDir
    212         source = os.path.join(homeDir, 'test_recno.txt')
    213         if not os.path.isdir(homeDir):
    214             os.mkdir(homeDir)
    215         f = open(source, 'w') # create the file
    216         f.close()
    217 
    218         d = db.DB()
    219         # This is the default value, just checking if both int
    220         d.set_re_delim(0x0A)
    221         d.set_re_delim('\n')  # and char can be used...
    222         d.set_re_source(source)
    223         d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
    224 
    225         data = "The quick brown fox jumped over the lazy dog".split()
    226         for datum in data:
    227             d.append(datum)
    228         d.sync()
    229         d.close()
    230 
    231         # get the text from the backing source
    232         f = open(source, 'r')
    233         text = f.read()
    234         f.close()
    235         text = text.strip()
    236         if verbose:
    237             print text
    238             print data
    239             print text.split('\n')
    240 
    241         self.assertEqual(text.split('\n'), data)
    242 
    243         # open as a DB again
    244         d = db.DB()
    245         d.set_re_source(source)
    246         d.open(self.filename, db.DB_RECNO)
    247 
    248         d[3] = 'reddish-brown'
    249         d[8] = 'comatose'
    250 
    251         d.sync()
    252         d.close()
    253 
    254         f = open(source, 'r')
    255         text = f.read()
    256         f.close()
    257         text = text.strip()
    258         if verbose:
    259             print text
    260             print text.split('\n')
    261 
    262         self.assertEqual(text.split('\n'),
    263            "The quick reddish-brown fox jumped over the comatose dog".split())
    264 
    265     def test03_FixedLength(self):
    266         d = db.DB()
    267         d.set_re_len(40)  # fixed length records, 40 bytes long
    268         d.set_re_pad('-') # sets the pad character...
    269         d.set_re_pad(45)  # ...test both int and char
    270         d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
    271 
    272         for x in string.ascii_letters:
    273             d.append(x * 35)    # These will be padded
    274 
    275         d.append('.' * 40)      # this one will be exact
    276 
    277         try:                    # this one will fail
    278             d.append('bad' * 20)
    279         except db.DBInvalidArgError, val:
    280             if sys.version_info < (2, 6) :
    281                 self.assertEqual(val[0], db.EINVAL)
    282             else :
    283                 self.assertEqual(val.args[0], db.EINVAL)
    284             if verbose: print val
    285         else:
    286             self.fail("expected exception")
    287 
    288         c = d.cursor()
    289         rec = c.first()
    290         while rec:
    291             if verbose:
    292                 print rec
    293             rec = c.next()
    294 
    295         c.close()
    296         d.close()
    297 
    298     def test04_get_size_empty(self) :
    299         d = db.DB()
    300         d.open(self.filename, dbtype=db.DB_RECNO, flags=db.DB_CREATE)
    301 
    302         row_id = d.append(' ')
    303         self.assertEqual(1, d.get_size(key=row_id))
    304         row_id = d.append('')
    305         self.assertEqual(0, d.get_size(key=row_id))
    306 
    307 
    308 
    309 
    310 
    311 #----------------------------------------------------------------------
    312 
    313 
    314 def test_suite():
    315     return unittest.makeSuite(SimpleRecnoTestCase)
    316 
    317 
    318 if __name__ == '__main__':
    319     unittest.main(defaultTest='test_suite')
    320