Home | History | Annotate | Download | only in test
      1 """Test script for the bsddb C module by Roger E. Masse
      2    Adapted to unittest format and expanded scope by Raymond Hettinger
      3 """
      4 import os, sys
      5 import unittest
      6 from test import test_support
      7 
      8 # Skip test if _bsddb wasn't built.
      9 test_support.import_module('_bsddb')
     10 
     11 bsddb = test_support.import_module('bsddb', deprecated=True)
     12 # Just so we know it's imported:
     13 test_support.import_module('dbhash', deprecated=True)
     14 
     15 
     16 class TestBSDDB(unittest.TestCase):
     17     openflag = 'c'
     18 
     19     def setUp(self):
     20         self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
     21         self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
     22         for k, v in self.d.iteritems():
     23             self.f[k] = v
     24 
     25     def tearDown(self):
     26         self.f.sync()
     27         self.f.close()
     28         if self.fname is None:
     29             return
     30         try:
     31             os.remove(self.fname)
     32         except os.error:
     33             pass
     34 
     35     def test_getitem(self):
     36         for k, v in self.d.iteritems():
     37             self.assertEqual(self.f[k], v)
     38 
     39     def test_len(self):
     40         self.assertEqual(len(self.f), len(self.d))
     41 
     42     def test_change(self):
     43         self.f['r'] = 'discovered'
     44         self.assertEqual(self.f['r'], 'discovered')
     45         self.assertIn('r', self.f.keys())
     46         self.assertIn('discovered', self.f.values())
     47 
     48     def test_close_and_reopen(self):
     49         self.assertIsNotNone(self.fname)
     50         self.f.close()
     51         self.f = self.openmethod[0](self.fname, 'w')
     52         for k, v in self.d.iteritems():
     53             self.assertEqual(self.f[k], v)
     54 
     55     def assertSetEquals(self, seqn1, seqn2):
     56         self.assertEqual(set(seqn1), set(seqn2))
     57 
     58     def test_mapping_iteration_methods(self):
     59         f = self.f
     60         d = self.d
     61         self.assertSetEquals(d, f)
     62         self.assertSetEquals(d.keys(), f.keys())
     63         self.assertSetEquals(d.values(), f.values())
     64         self.assertSetEquals(d.items(), f.items())
     65         self.assertSetEquals(d.iterkeys(), f.iterkeys())
     66         self.assertSetEquals(d.itervalues(), f.itervalues())
     67         self.assertSetEquals(d.iteritems(), f.iteritems())
     68 
     69     def test_iter_while_modifying_values(self):
     70         di = iter(self.d)
     71         while 1:
     72             try:
     73                 key = di.next()
     74                 self.d[key] = 'modified '+key
     75             except StopIteration:
     76                 break
     77 
     78         # it should behave the same as a dict.  modifying values
     79         # of existing keys should not break iteration.  (adding
     80         # or removing keys should)
     81         loops_left = len(self.f)
     82         fi = iter(self.f)
     83         while 1:
     84             try:
     85                 key = fi.next()
     86                 self.f[key] = 'modified '+key
     87                 loops_left -= 1
     88             except StopIteration:
     89                 break
     90         self.assertEqual(loops_left, 0)
     91 
     92         self.test_mapping_iteration_methods()
     93 
     94     def test_iter_abort_on_changed_size(self):
     95         def DictIterAbort():
     96             di = iter(self.d)
     97             while 1:
     98                 try:
     99                     di.next()
    100                     self.d['newkey'] = 'SPAM'
    101                 except StopIteration:
    102                     break
    103         self.assertRaises(RuntimeError, DictIterAbort)
    104 
    105         def DbIterAbort():
    106             fi = iter(self.f)
    107             while 1:
    108                 try:
    109                     fi.next()
    110                     self.f['newkey'] = 'SPAM'
    111                 except StopIteration:
    112                     break
    113         self.assertRaises(RuntimeError, DbIterAbort)
    114 
    115     def test_iteritems_abort_on_changed_size(self):
    116         def DictIteritemsAbort():
    117             di = self.d.iteritems()
    118             while 1:
    119                 try:
    120                     di.next()
    121                     self.d['newkey'] = 'SPAM'
    122                 except StopIteration:
    123                     break
    124         self.assertRaises(RuntimeError, DictIteritemsAbort)
    125 
    126         def DbIteritemsAbort():
    127             fi = self.f.iteritems()
    128             while 1:
    129                 try:
    130                     key, value = fi.next()
    131                     del self.f[key]
    132                 except StopIteration:
    133                     break
    134         self.assertRaises(RuntimeError, DbIteritemsAbort)
    135 
    136     def test_iteritems_while_modifying_values(self):
    137         di = self.d.iteritems()
    138         while 1:
    139             try:
    140                 k, v = di.next()
    141                 self.d[k] = 'modified '+v
    142             except StopIteration:
    143                 break
    144 
    145         # it should behave the same as a dict.  modifying values
    146         # of existing keys should not break iteration.  (adding
    147         # or removing keys should)
    148         loops_left = len(self.f)
    149         fi = self.f.iteritems()
    150         while 1:
    151             try:
    152                 k, v = fi.next()
    153                 self.f[k] = 'modified '+v
    154                 loops_left -= 1
    155             except StopIteration:
    156                 break
    157         self.assertEqual(loops_left, 0)
    158 
    159         self.test_mapping_iteration_methods()
    160 
    161     def test_first_next_looping(self):
    162         items = [self.f.first()]
    163         for i in xrange(1, len(self.f)):
    164             items.append(self.f.next())
    165         self.assertSetEquals(items, self.d.items())
    166 
    167     def test_previous_last_looping(self):
    168         items = [self.f.last()]
    169         for i in xrange(1, len(self.f)):
    170             items.append(self.f.previous())
    171         self.assertSetEquals(items, self.d.items())
    172 
    173     def test_first_while_deleting(self):
    174         # Test for bug 1725856
    175         self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
    176         for _ in self.d:
    177             key = self.f.first()[0]
    178             del self.f[key]
    179         self.assertEqual([], self.f.items(), "expected empty db after test")
    180 
    181     def test_last_while_deleting(self):
    182         # Test for bug 1725856's evil twin
    183         self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
    184         for _ in self.d:
    185             key = self.f.last()[0]
    186             del self.f[key]
    187         self.assertEqual([], self.f.items(), "expected empty db after test")
    188 
    189     def test_set_location(self):
    190         self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
    191 
    192     def test_contains(self):
    193         for k in self.d:
    194             self.assertIn(k, self.f)
    195         self.assertNotIn('not here', self.f)
    196 
    197     def test_has_key(self):
    198         for k in self.d:
    199             self.assertTrue(self.f.has_key(k))
    200         self.assertTrue(not self.f.has_key('not here'))
    201 
    202     def test_clear(self):
    203         self.f.clear()
    204         self.assertEqual(len(self.f), 0)
    205 
    206     def test__no_deadlock_first(self, debug=0):
    207         # do this so that testers can see what function we're in in
    208         # verbose mode when we deadlock.
    209         sys.stdout.flush()
    210 
    211         # in pybsddb's _DBWithCursor this causes an internal DBCursor
    212         # object is created.  Other test_ methods in this class could
    213         # inadvertently cause the deadlock but an explicit test is needed.
    214         if debug: print "A"
    215         k,v = self.f.first()
    216         if debug: print "B", k
    217         self.f[k] = "deadlock.  do not pass go.  do not collect $200."
    218         if debug: print "C"
    219         # if the bsddb implementation leaves the DBCursor open during
    220         # the database write and locking+threading support is enabled
    221         # the cursor's read lock will deadlock the write lock request..
    222 
    223         # test the iterator interface
    224         if True:
    225             if debug: print "D"
    226             i = self.f.iteritems()
    227             k,v = i.next()
    228             if debug: print "E"
    229             self.f[k] = "please don't deadlock"
    230             if debug: print "F"
    231             while 1:
    232                 try:
    233                     k,v = i.next()
    234                 except StopIteration:
    235                     break
    236             if debug: print "F2"
    237 
    238             i = iter(self.f)
    239             if debug: print "G"
    240             while i:
    241                 try:
    242                     if debug: print "H"
    243                     k = i.next()
    244                     if debug: print "I"
    245                     self.f[k] = "deadlocks-r-us"
    246                     if debug: print "J"
    247                 except StopIteration:
    248                     i = None
    249             if debug: print "K"
    250 
    251         # test the legacy cursor interface mixed with writes
    252         self.assertIn(self.f.first()[0], self.d)
    253         k = self.f.next()[0]
    254         self.assertIn(k, self.d)
    255         self.f[k] = "be gone with ye deadlocks"
    256         self.assertTrue(self.f[k], "be gone with ye deadlocks")
    257 
    258     def test_for_cursor_memleak(self):
    259         # do the bsddb._DBWithCursor iterator internals leak cursors?
    260         nc1 = len(self.f._cursor_refs)
    261         # create iterator
    262         i = self.f.iteritems()
    263         nc2 = len(self.f._cursor_refs)
    264         # use the iterator (should run to the first yield, creating the cursor)
    265         k, v = i.next()
    266         nc3 = len(self.f._cursor_refs)
    267         # destroy the iterator; this should cause the weakref callback
    268         # to remove the cursor object from self.f._cursor_refs
    269         del i
    270         nc4 = len(self.f._cursor_refs)
    271 
    272         self.assertEqual(nc1, nc2)
    273         self.assertEqual(nc1, nc4)
    274         self.assertTrue(nc3 == nc1+1)
    275 
    276     def test_popitem(self):
    277         k, v = self.f.popitem()
    278         self.assertIn(k, self.d)
    279         self.assertIn(v, self.d.values())
    280         self.assertNotIn(k, self.f)
    281         self.assertEqual(len(self.d)-1, len(self.f))
    282 
    283     def test_pop(self):
    284         k = 'w'
    285         v = self.f.pop(k)
    286         self.assertEqual(v, self.d[k])
    287         self.assertNotIn(k, self.f)
    288         self.assertNotIn(v, self.f.values())
    289         self.assertEqual(len(self.d)-1, len(self.f))
    290 
    291     def test_get(self):
    292         self.assertEqual(self.f.get('NotHere'), None)
    293         self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
    294         self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
    295 
    296     def test_setdefault(self):
    297         self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
    298         self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
    299 
    300     def test_update(self):
    301         new = dict(y='life', u='of', i='brian')
    302         self.f.update(new)
    303         self.d.update(new)
    304         for k, v in self.d.iteritems():
    305             self.assertEqual(self.f[k], v)
    306 
    307     def test_keyordering(self):
    308         self.assertIs(self.openmethod[0], bsddb.btopen)
    309         keys = self.d.keys()
    310         keys.sort()
    311         self.assertEqual(self.f.first()[0], keys[0])
    312         self.assertEqual(self.f.next()[0], keys[1])
    313         self.assertEqual(self.f.last()[0], keys[-1])
    314         self.assertEqual(self.f.previous()[0], keys[-2])
    315         self.assertEqual(list(self.f), keys)
    316 
    317 class TestBTree(TestBSDDB):
    318     fname = test_support.TESTFN
    319     openmethod = [bsddb.btopen]
    320 
    321 class TestBTree_InMemory(TestBSDDB):
    322     fname = None
    323     openmethod = [bsddb.btopen]
    324 
    325     # if we're using an in-memory only db, we can't reopen it
    326     test_close_and_reopen = None
    327 
    328 class TestBTree_InMemory_Truncate(TestBSDDB):
    329     fname = None
    330     openflag = 'n'
    331     openmethod = [bsddb.btopen]
    332 
    333     # if we're using an in-memory only db, we can't reopen it
    334     test_close_and_reopen = None
    335 
    336 class TestHashTable(TestBSDDB):
    337     fname = test_support.TESTFN
    338     openmethod = [bsddb.hashopen]
    339 
    340     # keyordering is specific to btopen method
    341     test_keyordering = None
    342 
    343 class TestHashTable_InMemory(TestBSDDB):
    344     fname = None
    345     openmethod = [bsddb.hashopen]
    346 
    347     # if we're using an in-memory only db, we can't reopen it
    348     test_close_and_reopen = None
    349 
    350     # keyordering is specific to btopen method
    351     test_keyordering = None
    352 
    353 ##         # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
    354 ##         #                                   appears broken... at least on
    355 ##         #                                   Solaris Intel - rmasse 1/97
    356 
    357 def test_main(verbose=None):
    358     test_support.run_unittest(
    359         TestBTree,
    360         TestHashTable,
    361         TestBTree_InMemory,
    362         TestHashTable_InMemory,
    363         TestBTree_InMemory_Truncate,
    364     )
    365 
    366 if __name__ == "__main__":
    367     test_main(verbose=True)
    368