1 #! /usr/bin/env python 2 """Test script for the bsddb C module by Roger E. Masse 3 Adapted to unittest format and expanded scope by Raymond Hettinger 4 """ 5 import os, sys 6 import unittest 7 from test import test_support 8 9 # Skip test if _bsddb wasn't built. 10 test_support.import_module('_bsddb') 11 12 bsddb = test_support.import_module('bsddb', deprecated=True) 13 # Just so we know it's imported: 14 test_support.import_module('dbhash', deprecated=True) 15 16 17 class TestBSDDB(unittest.TestCase): 18 openflag = 'c' 19 20 def setUp(self): 21 self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768) 22 self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='') 23 for k, v in self.d.iteritems(): 24 self.f[k] = v 25 26 def tearDown(self): 27 self.f.sync() 28 self.f.close() 29 if self.fname is None: 30 return 31 try: 32 os.remove(self.fname) 33 except os.error: 34 pass 35 36 def test_getitem(self): 37 for k, v in self.d.iteritems(): 38 self.assertEqual(self.f[k], v) 39 40 def test_len(self): 41 self.assertEqual(len(self.f), len(self.d)) 42 43 def test_change(self): 44 self.f['r'] = 'discovered' 45 self.assertEqual(self.f['r'], 'discovered') 46 self.assertIn('r', self.f.keys()) 47 self.assertIn('discovered', self.f.values()) 48 49 def test_close_and_reopen(self): 50 if self.fname is None: 51 # if we're using an in-memory only db, we can't reopen it 52 # so finish here. 53 return 54 self.f.close() 55 self.f = self.openmethod[0](self.fname, 'w') 56 for k, v in self.d.iteritems(): 57 self.assertEqual(self.f[k], v) 58 59 def assertSetEquals(self, seqn1, seqn2): 60 self.assertEqual(set(seqn1), set(seqn2)) 61 62 def test_mapping_iteration_methods(self): 63 f = self.f 64 d = self.d 65 self.assertSetEquals(d, f) 66 self.assertSetEquals(d.keys(), f.keys()) 67 self.assertSetEquals(d.values(), f.values()) 68 self.assertSetEquals(d.items(), f.items()) 69 self.assertSetEquals(d.iterkeys(), f.iterkeys()) 70 self.assertSetEquals(d.itervalues(), f.itervalues()) 71 self.assertSetEquals(d.iteritems(), f.iteritems()) 72 73 def test_iter_while_modifying_values(self): 74 di = iter(self.d) 75 while 1: 76 try: 77 key = di.next() 78 self.d[key] = 'modified '+key 79 except StopIteration: 80 break 81 82 # it should behave the same as a dict. modifying values 83 # of existing keys should not break iteration. (adding 84 # or removing keys should) 85 loops_left = len(self.f) 86 fi = iter(self.f) 87 while 1: 88 try: 89 key = fi.next() 90 self.f[key] = 'modified '+key 91 loops_left -= 1 92 except StopIteration: 93 break 94 self.assertEqual(loops_left, 0) 95 96 self.test_mapping_iteration_methods() 97 98 def test_iter_abort_on_changed_size(self): 99 def DictIterAbort(): 100 di = iter(self.d) 101 while 1: 102 try: 103 di.next() 104 self.d['newkey'] = 'SPAM' 105 except StopIteration: 106 break 107 self.assertRaises(RuntimeError, DictIterAbort) 108 109 def DbIterAbort(): 110 fi = iter(self.f) 111 while 1: 112 try: 113 fi.next() 114 self.f['newkey'] = 'SPAM' 115 except StopIteration: 116 break 117 self.assertRaises(RuntimeError, DbIterAbort) 118 119 def test_iteritems_abort_on_changed_size(self): 120 def DictIteritemsAbort(): 121 di = self.d.iteritems() 122 while 1: 123 try: 124 di.next() 125 self.d['newkey'] = 'SPAM' 126 except StopIteration: 127 break 128 self.assertRaises(RuntimeError, DictIteritemsAbort) 129 130 def DbIteritemsAbort(): 131 fi = self.f.iteritems() 132 while 1: 133 try: 134 key, value = fi.next() 135 del self.f[key] 136 except StopIteration: 137 break 138 self.assertRaises(RuntimeError, DbIteritemsAbort) 139 140 def test_iteritems_while_modifying_values(self): 141 di = self.d.iteritems() 142 while 1: 143 try: 144 k, v = di.next() 145 self.d[k] = 'modified '+v 146 except StopIteration: 147 break 148 149 # it should behave the same as a dict. modifying values 150 # of existing keys should not break iteration. (adding 151 # or removing keys should) 152 loops_left = len(self.f) 153 fi = self.f.iteritems() 154 while 1: 155 try: 156 k, v = fi.next() 157 self.f[k] = 'modified '+v 158 loops_left -= 1 159 except StopIteration: 160 break 161 self.assertEqual(loops_left, 0) 162 163 self.test_mapping_iteration_methods() 164 165 def test_first_next_looping(self): 166 items = [self.f.first()] 167 for i in xrange(1, len(self.f)): 168 items.append(self.f.next()) 169 self.assertSetEquals(items, self.d.items()) 170 171 def test_previous_last_looping(self): 172 items = [self.f.last()] 173 for i in xrange(1, len(self.f)): 174 items.append(self.f.previous()) 175 self.assertSetEquals(items, self.d.items()) 176 177 def test_first_while_deleting(self): 178 # Test for bug 1725856 179 self.assertTrue(len(self.d) >= 2, "test requires >=2 items") 180 for _ in self.d: 181 key = self.f.first()[0] 182 del self.f[key] 183 self.assertEqual([], self.f.items(), "expected empty db after test") 184 185 def test_last_while_deleting(self): 186 # Test for bug 1725856's evil twin 187 self.assertTrue(len(self.d) >= 2, "test requires >=2 items") 188 for _ in self.d: 189 key = self.f.last()[0] 190 del self.f[key] 191 self.assertEqual([], self.f.items(), "expected empty db after test") 192 193 def test_set_location(self): 194 self.assertEqual(self.f.set_location('e'), ('e', self.d['e'])) 195 196 def test_contains(self): 197 for k in self.d: 198 self.assertIn(k, self.f) 199 self.assertNotIn('not here', self.f) 200 201 def test_has_key(self): 202 for k in self.d: 203 self.assertTrue(self.f.has_key(k)) 204 self.assertTrue(not self.f.has_key('not here')) 205 206 def test_clear(self): 207 self.f.clear() 208 self.assertEqual(len(self.f), 0) 209 210 def test__no_deadlock_first(self, debug=0): 211 # do this so that testers can see what function we're in in 212 # verbose mode when we deadlock. 213 sys.stdout.flush() 214 215 # in pybsddb's _DBWithCursor this causes an internal DBCursor 216 # object is created. Other test_ methods in this class could 217 # inadvertently cause the deadlock but an explicit test is needed. 218 if debug: print "A" 219 k,v = self.f.first() 220 if debug: print "B", k 221 self.f[k] = "deadlock. do not pass go. do not collect $200." 222 if debug: print "C" 223 # if the bsddb implementation leaves the DBCursor open during 224 # the database write and locking+threading support is enabled 225 # the cursor's read lock will deadlock the write lock request.. 226 227 # test the iterator interface 228 if True: 229 if debug: print "D" 230 i = self.f.iteritems() 231 k,v = i.next() 232 if debug: print "E" 233 self.f[k] = "please don't deadlock" 234 if debug: print "F" 235 while 1: 236 try: 237 k,v = i.next() 238 except StopIteration: 239 break 240 if debug: print "F2" 241 242 i = iter(self.f) 243 if debug: print "G" 244 while i: 245 try: 246 if debug: print "H" 247 k = i.next() 248 if debug: print "I" 249 self.f[k] = "deadlocks-r-us" 250 if debug: print "J" 251 except StopIteration: 252 i = None 253 if debug: print "K" 254 255 # test the legacy cursor interface mixed with writes 256 self.assertIn(self.f.first()[0], self.d) 257 k = self.f.next()[0] 258 self.assertIn(k, self.d) 259 self.f[k] = "be gone with ye deadlocks" 260 self.assertTrue(self.f[k], "be gone with ye deadlocks") 261 262 def test_for_cursor_memleak(self): 263 # do the bsddb._DBWithCursor iterator internals leak cursors? 264 nc1 = len(self.f._cursor_refs) 265 # create iterator 266 i = self.f.iteritems() 267 nc2 = len(self.f._cursor_refs) 268 # use the iterator (should run to the first yield, creating the cursor) 269 k, v = i.next() 270 nc3 = len(self.f._cursor_refs) 271 # destroy the iterator; this should cause the weakref callback 272 # to remove the cursor object from self.f._cursor_refs 273 del i 274 nc4 = len(self.f._cursor_refs) 275 276 self.assertEqual(nc1, nc2) 277 self.assertEqual(nc1, nc4) 278 self.assertTrue(nc3 == nc1+1) 279 280 def test_popitem(self): 281 k, v = self.f.popitem() 282 self.assertIn(k, self.d) 283 self.assertIn(v, self.d.values()) 284 self.assertNotIn(k, self.f) 285 self.assertEqual(len(self.d)-1, len(self.f)) 286 287 def test_pop(self): 288 k = 'w' 289 v = self.f.pop(k) 290 self.assertEqual(v, self.d[k]) 291 self.assertNotIn(k, self.f) 292 self.assertNotIn(v, self.f.values()) 293 self.assertEqual(len(self.d)-1, len(self.f)) 294 295 def test_get(self): 296 self.assertEqual(self.f.get('NotHere'), None) 297 self.assertEqual(self.f.get('NotHere', 'Default'), 'Default') 298 self.assertEqual(self.f.get('q', 'Default'), self.d['q']) 299 300 def test_setdefault(self): 301 self.assertEqual(self.f.setdefault('new', 'dog'), 'dog') 302 self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r']) 303 304 def test_update(self): 305 new = dict(y='life', u='of', i='brian') 306 self.f.update(new) 307 self.d.update(new) 308 for k, v in self.d.iteritems(): 309 self.assertEqual(self.f[k], v) 310 311 def test_keyordering(self): 312 if self.openmethod[0] is not bsddb.btopen: 313 return 314 keys = self.d.keys() 315 keys.sort() 316 self.assertEqual(self.f.first()[0], keys[0]) 317 self.assertEqual(self.f.next()[0], keys[1]) 318 self.assertEqual(self.f.last()[0], keys[-1]) 319 self.assertEqual(self.f.previous()[0], keys[-2]) 320 self.assertEqual(list(self.f), keys) 321 322 class TestBTree(TestBSDDB): 323 fname = test_support.TESTFN 324 openmethod = [bsddb.btopen] 325 326 class TestBTree_InMemory(TestBSDDB): 327 fname = None 328 openmethod = [bsddb.btopen] 329 330 class TestBTree_InMemory_Truncate(TestBSDDB): 331 fname = None 332 openflag = 'n' 333 openmethod = [bsddb.btopen] 334 335 class TestHashTable(TestBSDDB): 336 fname = test_support.TESTFN 337 openmethod = [bsddb.hashopen] 338 339 class TestHashTable_InMemory(TestBSDDB): 340 fname = None 341 openmethod = [bsddb.hashopen] 342 343 ## # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85 344 ## # appears broken... at least on 345 ## # Solaris Intel - rmasse 1/97 346 347 def test_main(verbose=None): 348 test_support.run_unittest( 349 TestBTree, 350 TestHashTable, 351 TestBTree_InMemory, 352 TestHashTable_InMemory, 353 TestBTree_InMemory_Truncate, 354 ) 355 356 if __name__ == "__main__": 357 test_main(verbose=True) 358