Home | History | Annotate | Download | only in test
      1 #! /usr/bin/env python
      2 """Test script for the bsddb C module by Roger E. Masse
      3    Adapted to unittest format and expanded scope by Raymond Hettinger
      4 """
      5 import os, sys
      6 import unittest
      7 from test import test_support
      8 
      9 # Skip test if _bsddb wasn't built.
     10 test_support.import_module('_bsddb')
     11 
     12 bsddb = test_support.import_module('bsddb', deprecated=True)
     13 # Just so we know it's imported:
     14 test_support.import_module('dbhash', deprecated=True)
     15 
     16 
     17 class TestBSDDB(unittest.TestCase):
     18     openflag = 'c'
     19 
     20     def setUp(self):
     21         self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
     22         self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
     23         for k, v in self.d.iteritems():
     24             self.f[k] = v
     25 
     26     def tearDown(self):
     27         self.f.sync()
     28         self.f.close()
     29         if self.fname is None:
     30             return
     31         try:
     32             os.remove(self.fname)
     33         except os.error:
     34             pass
     35 
     36     def test_getitem(self):
     37         for k, v in self.d.iteritems():
     38             self.assertEqual(self.f[k], v)
     39 
     40     def test_len(self):
     41         self.assertEqual(len(self.f), len(self.d))
     42 
     43     def test_change(self):
     44         self.f['r'] = 'discovered'
     45         self.assertEqual(self.f['r'], 'discovered')
     46         self.assertIn('r', self.f.keys())
     47         self.assertIn('discovered', self.f.values())
     48 
     49     def test_close_and_reopen(self):
     50         if self.fname is None:
     51             # if we're using an in-memory only db, we can't reopen it
     52             # so finish here.
     53             return
     54         self.f.close()
     55         self.f = self.openmethod[0](self.fname, 'w')
     56         for k, v in self.d.iteritems():
     57             self.assertEqual(self.f[k], v)
     58 
     59     def assertSetEquals(self, seqn1, seqn2):
     60         self.assertEqual(set(seqn1), set(seqn2))
     61 
     62     def test_mapping_iteration_methods(self):
     63         f = self.f
     64         d = self.d
     65         self.assertSetEquals(d, f)
     66         self.assertSetEquals(d.keys(), f.keys())
     67         self.assertSetEquals(d.values(), f.values())
     68         self.assertSetEquals(d.items(), f.items())
     69         self.assertSetEquals(d.iterkeys(), f.iterkeys())
     70         self.assertSetEquals(d.itervalues(), f.itervalues())
     71         self.assertSetEquals(d.iteritems(), f.iteritems())
     72 
     73     def test_iter_while_modifying_values(self):
     74         di = iter(self.d)
     75         while 1:
     76             try:
     77                 key = di.next()
     78                 self.d[key] = 'modified '+key
     79             except StopIteration:
     80                 break
     81 
     82         # it should behave the same as a dict.  modifying values
     83         # of existing keys should not break iteration.  (adding
     84         # or removing keys should)
     85         loops_left = len(self.f)
     86         fi = iter(self.f)
     87         while 1:
     88             try:
     89                 key = fi.next()
     90                 self.f[key] = 'modified '+key
     91                 loops_left -= 1
     92             except StopIteration:
     93                 break
     94         self.assertEqual(loops_left, 0)
     95 
     96         self.test_mapping_iteration_methods()
     97 
     98     def test_iter_abort_on_changed_size(self):
     99         def DictIterAbort():
    100             di = iter(self.d)
    101             while 1:
    102                 try:
    103                     di.next()
    104                     self.d['newkey'] = 'SPAM'
    105                 except StopIteration:
    106                     break
    107         self.assertRaises(RuntimeError, DictIterAbort)
    108 
    109         def DbIterAbort():
    110             fi = iter(self.f)
    111             while 1:
    112                 try:
    113                     fi.next()
    114                     self.f['newkey'] = 'SPAM'
    115                 except StopIteration:
    116                     break
    117         self.assertRaises(RuntimeError, DbIterAbort)
    118 
    119     def test_iteritems_abort_on_changed_size(self):
    120         def DictIteritemsAbort():
    121             di = self.d.iteritems()
    122             while 1:
    123                 try:
    124                     di.next()
    125                     self.d['newkey'] = 'SPAM'
    126                 except StopIteration:
    127                     break
    128         self.assertRaises(RuntimeError, DictIteritemsAbort)
    129 
    130         def DbIteritemsAbort():
    131             fi = self.f.iteritems()
    132             while 1:
    133                 try:
    134                     key, value = fi.next()
    135                     del self.f[key]
    136                 except StopIteration:
    137                     break
    138         self.assertRaises(RuntimeError, DbIteritemsAbort)
    139 
    140     def test_iteritems_while_modifying_values(self):
    141         di = self.d.iteritems()
    142         while 1:
    143             try:
    144                 k, v = di.next()
    145                 self.d[k] = 'modified '+v
    146             except StopIteration:
    147                 break
    148 
    149         # it should behave the same as a dict.  modifying values
    150         # of existing keys should not break iteration.  (adding
    151         # or removing keys should)
    152         loops_left = len(self.f)
    153         fi = self.f.iteritems()
    154         while 1:
    155             try:
    156                 k, v = fi.next()
    157                 self.f[k] = 'modified '+v
    158                 loops_left -= 1
    159             except StopIteration:
    160                 break
    161         self.assertEqual(loops_left, 0)
    162 
    163         self.test_mapping_iteration_methods()
    164 
    165     def test_first_next_looping(self):
    166         items = [self.f.first()]
    167         for i in xrange(1, len(self.f)):
    168             items.append(self.f.next())
    169         self.assertSetEquals(items, self.d.items())
    170 
    171     def test_previous_last_looping(self):
    172         items = [self.f.last()]
    173         for i in xrange(1, len(self.f)):
    174             items.append(self.f.previous())
    175         self.assertSetEquals(items, self.d.items())
    176 
    177     def test_first_while_deleting(self):
    178         # Test for bug 1725856
    179         self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
    180         for _ in self.d:
    181             key = self.f.first()[0]
    182             del self.f[key]
    183         self.assertEqual([], self.f.items(), "expected empty db after test")
    184 
    185     def test_last_while_deleting(self):
    186         # Test for bug 1725856's evil twin
    187         self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
    188         for _ in self.d:
    189             key = self.f.last()[0]
    190             del self.f[key]
    191         self.assertEqual([], self.f.items(), "expected empty db after test")
    192 
    193     def test_set_location(self):
    194         self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
    195 
    196     def test_contains(self):
    197         for k in self.d:
    198             self.assertIn(k, self.f)
    199         self.assertNotIn('not here', self.f)
    200 
    201     def test_has_key(self):
    202         for k in self.d:
    203             self.assertTrue(self.f.has_key(k))
    204         self.assertTrue(not self.f.has_key('not here'))
    205 
    206     def test_clear(self):
    207         self.f.clear()
    208         self.assertEqual(len(self.f), 0)
    209 
    210     def test__no_deadlock_first(self, debug=0):
    211         # do this so that testers can see what function we're in in
    212         # verbose mode when we deadlock.
    213         sys.stdout.flush()
    214 
    215         # in pybsddb's _DBWithCursor this causes an internal DBCursor
    216         # object is created.  Other test_ methods in this class could
    217         # inadvertently cause the deadlock but an explicit test is needed.
    218         if debug: print "A"
    219         k,v = self.f.first()
    220         if debug: print "B", k
    221         self.f[k] = "deadlock.  do not pass go.  do not collect $200."
    222         if debug: print "C"
    223         # if the bsddb implementation leaves the DBCursor open during
    224         # the database write and locking+threading support is enabled
    225         # the cursor's read lock will deadlock the write lock request..
    226 
    227         # test the iterator interface
    228         if True:
    229             if debug: print "D"
    230             i = self.f.iteritems()
    231             k,v = i.next()
    232             if debug: print "E"
    233             self.f[k] = "please don't deadlock"
    234             if debug: print "F"
    235             while 1:
    236                 try:
    237                     k,v = i.next()
    238                 except StopIteration:
    239                     break
    240             if debug: print "F2"
    241 
    242             i = iter(self.f)
    243             if debug: print "G"
    244             while i:
    245                 try:
    246                     if debug: print "H"
    247                     k = i.next()
    248                     if debug: print "I"
    249                     self.f[k] = "deadlocks-r-us"
    250                     if debug: print "J"
    251                 except StopIteration:
    252                     i = None
    253             if debug: print "K"
    254 
    255         # test the legacy cursor interface mixed with writes
    256         self.assertIn(self.f.first()[0], self.d)
    257         k = self.f.next()[0]
    258         self.assertIn(k, self.d)
    259         self.f[k] = "be gone with ye deadlocks"
    260         self.assertTrue(self.f[k], "be gone with ye deadlocks")
    261 
    262     def test_for_cursor_memleak(self):
    263         # do the bsddb._DBWithCursor iterator internals leak cursors?
    264         nc1 = len(self.f._cursor_refs)
    265         # create iterator
    266         i = self.f.iteritems()
    267         nc2 = len(self.f._cursor_refs)
    268         # use the iterator (should run to the first yield, creating the cursor)
    269         k, v = i.next()
    270         nc3 = len(self.f._cursor_refs)
    271         # destroy the iterator; this should cause the weakref callback
    272         # to remove the cursor object from self.f._cursor_refs
    273         del i
    274         nc4 = len(self.f._cursor_refs)
    275 
    276         self.assertEqual(nc1, nc2)
    277         self.assertEqual(nc1, nc4)
    278         self.assertTrue(nc3 == nc1+1)
    279 
    280     def test_popitem(self):
    281         k, v = self.f.popitem()
    282         self.assertIn(k, self.d)
    283         self.assertIn(v, self.d.values())
    284         self.assertNotIn(k, self.f)
    285         self.assertEqual(len(self.d)-1, len(self.f))
    286 
    287     def test_pop(self):
    288         k = 'w'
    289         v = self.f.pop(k)
    290         self.assertEqual(v, self.d[k])
    291         self.assertNotIn(k, self.f)
    292         self.assertNotIn(v, self.f.values())
    293         self.assertEqual(len(self.d)-1, len(self.f))
    294 
    295     def test_get(self):
    296         self.assertEqual(self.f.get('NotHere'), None)
    297         self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
    298         self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
    299 
    300     def test_setdefault(self):
    301         self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
    302         self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
    303 
    304     def test_update(self):
    305         new = dict(y='life', u='of', i='brian')
    306         self.f.update(new)
    307         self.d.update(new)
    308         for k, v in self.d.iteritems():
    309             self.assertEqual(self.f[k], v)
    310 
    311     def test_keyordering(self):
    312         if self.openmethod[0] is not bsddb.btopen:
    313             return
    314         keys = self.d.keys()
    315         keys.sort()
    316         self.assertEqual(self.f.first()[0], keys[0])
    317         self.assertEqual(self.f.next()[0], keys[1])
    318         self.assertEqual(self.f.last()[0], keys[-1])
    319         self.assertEqual(self.f.previous()[0], keys[-2])
    320         self.assertEqual(list(self.f), keys)
    321 
    322 class TestBTree(TestBSDDB):
    323     fname = test_support.TESTFN
    324     openmethod = [bsddb.btopen]
    325 
    326 class TestBTree_InMemory(TestBSDDB):
    327     fname = None
    328     openmethod = [bsddb.btopen]
    329 
    330 class TestBTree_InMemory_Truncate(TestBSDDB):
    331     fname = None
    332     openflag = 'n'
    333     openmethod = [bsddb.btopen]
    334 
    335 class TestHashTable(TestBSDDB):
    336     fname = test_support.TESTFN
    337     openmethod = [bsddb.hashopen]
    338 
    339 class TestHashTable_InMemory(TestBSDDB):
    340     fname = None
    341     openmethod = [bsddb.hashopen]
    342 
    343 ##         # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
    344 ##         #                                   appears broken... at least on
    345 ##         #                                   Solaris Intel - rmasse 1/97
    346 
    347 def test_main(verbose=None):
    348     test_support.run_unittest(
    349         TestBTree,
    350         TestHashTable,
    351         TestBTree_InMemory,
    352         TestHashTable_InMemory,
    353         TestBTree_InMemory_Truncate,
    354     )
    355 
    356 if __name__ == "__main__":
    357     test_main(verbose=True)
    358