Home | History | Annotate | Download | only in test
      1 """TestCases for exercising a Recno DB.
      2 """
      3 
      4 import os, sys
      5 import errno
      6 from pprint import pprint
      7 import unittest
      8 
      9 from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path
     10 
     11 letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
     12 
     13 
     14 #----------------------------------------------------------------------
     15 
     16 class SimpleRecnoTestCase(unittest.TestCase):
     17     if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and
     18             (sys.version_info < (3, 2))) :
     19         def assertIsInstance(self, obj, datatype, msg=None) :
     20             return self.assertEqual(type(obj), datatype, msg=msg)
     21         def assertGreaterEqual(self, a, b, msg=None) :
     22             return self.assertTrue(a>=b, msg=msg)
     23 
     24 
     25     def setUp(self):
     26         self.filename = get_new_database_path()
     27         self.homeDir = None
     28 
     29     def tearDown(self):
     30         test_support.unlink(self.filename)
     31         if self.homeDir:
     32             test_support.rmtree(self.homeDir)
     33 
     34     def test01_basic(self):
     35         d = db.DB()
     36 
     37         get_returns_none = d.set_get_returns_none(2)
     38         d.set_get_returns_none(get_returns_none)
     39 
     40         d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
     41 
     42         for x in letters:
     43             recno = d.append(x * 60)
     44             self.assertIsInstance(recno, int)
     45             self.assertGreaterEqual(recno, 1)
     46             if verbose:
     47                 print recno,
     48 
     49         if verbose: print
     50 
     51         stat = d.stat()
     52         if verbose:
     53             pprint(stat)
     54 
     55         for recno in range(1, len(d)+1):
     56             data = d[recno]
     57             if verbose:
     58                 print data
     59 
     60             self.assertIsInstance(data, str)
     61             self.assertEqual(data, d.get(recno))
     62 
     63         try:
     64             data = d[0]  # This should raise a KeyError!?!?!
     65         except db.DBInvalidArgError, val:
     66             if sys.version_info < (2, 6) :
     67                 self.assertEqual(val[0], db.EINVAL)
     68             else :
     69                 self.assertEqual(val.args[0], db.EINVAL)
     70             if verbose: print val
     71         else:
     72             self.fail("expected exception")
     73 
     74         # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2)
     75         try:
     76             d.has_key(0)
     77         except db.DBError, val:
     78             pass
     79         else:
     80             self.fail("has_key did not raise a proper exception")
     81 
     82         try:
     83             data = d[100]
     84         except KeyError:
     85             pass
     86         else:
     87             self.fail("expected exception")
     88 
     89         try:
     90             data = d.get(100)
     91         except db.DBNotFoundError, val:
     92             if get_returns_none:
     93                 self.fail("unexpected exception")
     94         else:
     95             self.assertEqual(data, None)
     96 
     97         keys = d.keys()
     98         if verbose:
     99             print keys
    100         self.assertIsInstance(keys, list)
    101         self.assertIsInstance(keys[0], int)
    102         self.assertEqual(len(keys), len(d))
    103 
    104         items = d.items()
    105         if verbose:
    106             pprint(items)
    107         self.assertIsInstance(items, list)
    108         self.assertIsInstance(items[0], tuple)
    109         self.assertEqual(len(items[0]), 2)
    110         self.assertIsInstance(items[0][0], int)
    111         self.assertIsInstance(items[0][1], str)
    112         self.assertEqual(len(items), len(d))
    113 
    114         self.assertTrue(d.has_key(25))
    115 
    116         del d[25]
    117         self.assertFalse(d.has_key(25))
    118 
    119         d.delete(13)
    120         self.assertFalse(d.has_key(13))
    121 
    122         data = d.get_both(26, "z" * 60)
    123         self.assertEqual(data, "z" * 60, 'was %r' % data)
    124         if verbose:
    125             print data
    126 
    127         fd = d.fd()
    128         if verbose:
    129             print fd
    130 
    131         c = d.cursor()
    132         rec = c.first()
    133         while rec:
    134             if verbose:
    135                 print rec
    136             rec = c.next()
    137 
    138         c.set(50)
    139         rec = c.current()
    140         if verbose:
    141             print rec
    142 
    143         c.put(-1, "a replacement record", db.DB_CURRENT)
    144 
    145         c.set(50)
    146         rec = c.current()
    147         self.assertEqual(rec, (50, "a replacement record"))
    148         if verbose:
    149             print rec
    150 
    151         rec = c.set_range(30)
    152         if verbose:
    153             print rec
    154 
    155         # test that non-existent key lookups work (and that
    156         # DBC_set_range doesn't have a memleak under valgrind)
    157         rec = c.set_range(999999)
    158         self.assertEqual(rec, None)
    159         if verbose:
    160             print rec
    161 
    162         c.close()
    163         d.close()
    164 
    165         d = db.DB()
    166         d.open(self.filename)
    167         c = d.cursor()
    168 
    169         # put a record beyond the consecutive end of the recno's
    170         d[100] = "way out there"
    171         self.assertEqual(d[100], "way out there")
    172 
    173         try:
    174             data = d[99]
    175         except KeyError:
    176             pass
    177         else:
    178             self.fail("expected exception")
    179 
    180         try:
    181             d.get(99)
    182         except db.DBKeyEmptyError, val:
    183             if get_returns_none:
    184                 self.fail("unexpected DBKeyEmptyError exception")
    185             else:
    186                 if sys.version_info < (2, 6) :
    187                     self.assertEqual(val[0], db.DB_KEYEMPTY)
    188                 else :
    189                     self.assertEqual(val.args[0], db.DB_KEYEMPTY)
    190                 if verbose: print val
    191         else:
    192             if not get_returns_none:
    193                 self.fail("expected exception")
    194 
    195         rec = c.set(40)
    196         while rec:
    197             if verbose:
    198                 print rec
    199             rec = c.next()
    200 
    201         c.close()
    202         d.close()
    203 
    204     def test02_WithSource(self):
    205         """
    206         A Recno file that is given a "backing source file" is essentially a
    207         simple ASCII file.  Normally each record is delimited by \n and so is
    208         just a line in the file, but you can set a different record delimiter
    209         if needed.
    210         """
    211         homeDir = get_new_environment_path()
    212         self.homeDir = homeDir
    213         source = os.path.join(homeDir, 'test_recno.txt')
    214         if not os.path.isdir(homeDir):
    215             os.mkdir(homeDir)
    216         f = open(source, 'w') # create the file
    217         f.close()
    218 
    219         d = db.DB()
    220         # This is the default value, just checking if both int
    221         d.set_re_delim(0x0A)
    222         d.set_re_delim('\n')  # and char can be used...
    223         d.set_re_source(source)
    224         d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
    225 
    226         data = "The quick brown fox jumped over the lazy dog".split()
    227         for datum in data:
    228             d.append(datum)
    229         d.sync()
    230         d.close()
    231 
    232         # get the text from the backing source
    233         f = open(source, 'r')
    234         text = f.read()
    235         f.close()
    236         text = text.strip()
    237         if verbose:
    238             print text
    239             print data
    240             print text.split('\n')
    241 
    242         self.assertEqual(text.split('\n'), data)
    243 
    244         # open as a DB again
    245         d = db.DB()
    246         d.set_re_source(source)
    247         d.open(self.filename, db.DB_RECNO)
    248 
    249         d[3] = 'reddish-brown'
    250         d[8] = 'comatose'
    251 
    252         d.sync()
    253         d.close()
    254 
    255         f = open(source, 'r')
    256         text = f.read()
    257         f.close()
    258         text = text.strip()
    259         if verbose:
    260             print text
    261             print text.split('\n')
    262 
    263         self.assertEqual(text.split('\n'),
    264            "The quick reddish-brown fox jumped over the comatose dog".split())
    265 
    266     def test03_FixedLength(self):
    267         d = db.DB()
    268         d.set_re_len(40)  # fixed length records, 40 bytes long
    269         d.set_re_pad('-') # sets the pad character...
    270         d.set_re_pad(45)  # ...test both int and char
    271         d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
    272 
    273         for x in letters:
    274             d.append(x * 35)    # These will be padded
    275 
    276         d.append('.' * 40)      # this one will be exact
    277 
    278         try:                    # this one will fail
    279             d.append('bad' * 20)
    280         except db.DBInvalidArgError, val:
    281             if sys.version_info < (2, 6) :
    282                 self.assertEqual(val[0], db.EINVAL)
    283             else :
    284                 self.assertEqual(val.args[0], db.EINVAL)
    285             if verbose: print val
    286         else:
    287             self.fail("expected exception")
    288 
    289         c = d.cursor()
    290         rec = c.first()
    291         while rec:
    292             if verbose:
    293                 print rec
    294             rec = c.next()
    295 
    296         c.close()
    297         d.close()
    298 
    299     def test04_get_size_empty(self) :
    300         d = db.DB()
    301         d.open(self.filename, dbtype=db.DB_RECNO, flags=db.DB_CREATE)
    302 
    303         row_id = d.append(' ')
    304         self.assertEqual(1, d.get_size(key=row_id))
    305         row_id = d.append('')
    306         self.assertEqual(0, d.get_size(key=row_id))
    307 
    308 
    309 
    310 
    311 
    312 #----------------------------------------------------------------------
    313 
    314 
    315 def test_suite():
    316     return unittest.makeSuite(SimpleRecnoTestCase)
    317 
    318 
    319 if __name__ == '__main__':
    320     unittest.main(defaultTest='test_suite')
    321