1 """ 2 TestCases for checking dbShelve objects. 3 """ 4 5 import os, string, sys 6 import random 7 import unittest 8 9 10 from test_all import db, dbshelve, test_support, verbose, \ 11 get_new_environment_path, get_new_database_path 12 13 14 15 16 17 #---------------------------------------------------------------------- 18 19 # We want the objects to be comparable so we can test dbshelve.values 20 # later on. 21 class DataClass: 22 def __init__(self): 23 self.value = random.random() 24 25 def __repr__(self) : # For Python 3.0 comparison 26 return "DataClass %f" %self.value 27 28 def __cmp__(self, other): # For Python 2.x comparison 29 return cmp(self.value, other) 30 31 32 class DBShelveTestCase(unittest.TestCase): 33 if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and 34 (sys.version_info < (3, 2))) : 35 def assertIn(self, a, b, msg=None) : 36 return self.assertTrue(a in b, msg=msg) 37 38 39 def setUp(self): 40 if sys.version_info[0] >= 3 : 41 from test_all import do_proxy_db_py3k 42 self._flag_proxy_db_py3k = do_proxy_db_py3k(False) 43 self.filename = get_new_database_path() 44 self.do_open() 45 46 def tearDown(self): 47 if sys.version_info[0] >= 3 : 48 from test_all import do_proxy_db_py3k 49 do_proxy_db_py3k(self._flag_proxy_db_py3k) 50 self.do_close() 51 test_support.unlink(self.filename) 52 53 def mk(self, key): 54 """Turn key into an appropriate key type for this db""" 55 # override in child class for RECNO 56 if sys.version_info[0] < 3 : 57 return key 58 else : 59 return bytes(key, "iso8859-1") # 8 bits 60 61 def populateDB(self, d): 62 for x in string.letters: 63 d[self.mk('S' + x)] = 10 * x # add a string 64 d[self.mk('I' + x)] = ord(x) # add an integer 65 d[self.mk('L' + x)] = [x] * 10 # add a list 66 67 inst = DataClass() # add an instance 68 inst.S = 10 * x 69 inst.I = ord(x) 70 inst.L = [x] * 10 71 d[self.mk('O' + x)] = inst 72 73 74 # overridable in derived classes to affect how the shelf is created/opened 75 def do_open(self): 76 self.d = dbshelve.open(self.filename) 77 78 # and closed... 79 def do_close(self): 80 self.d.close() 81 82 83 84 def test01_basics(self): 85 if verbose: 86 print '\n', '-=' * 30 87 print "Running %s.test01_basics..." % self.__class__.__name__ 88 89 self.populateDB(self.d) 90 self.d.sync() 91 self.do_close() 92 self.do_open() 93 d = self.d 94 95 l = len(d) 96 k = d.keys() 97 s = d.stat() 98 f = d.fd() 99 100 if verbose: 101 print "length:", l 102 print "keys:", k 103 print "stats:", s 104 105 self.assertEqual(0, d.has_key(self.mk('bad key'))) 106 self.assertEqual(1, d.has_key(self.mk('IA'))) 107 self.assertEqual(1, d.has_key(self.mk('OA'))) 108 109 d.delete(self.mk('IA')) 110 del d[self.mk('OA')] 111 self.assertEqual(0, d.has_key(self.mk('IA'))) 112 self.assertEqual(0, d.has_key(self.mk('OA'))) 113 self.assertEqual(len(d), l-2) 114 115 values = [] 116 for key in d.keys(): 117 value = d[key] 118 values.append(value) 119 if verbose: 120 print "%s: %s" % (key, value) 121 self.checkrec(key, value) 122 123 dbvalues = d.values() 124 self.assertEqual(len(dbvalues), len(d.keys())) 125 if sys.version_info < (2, 6) : 126 values.sort() 127 dbvalues.sort() 128 self.assertEqual(values, dbvalues) 129 else : # XXX: Convert all to strings. Please, improve 130 values.sort(key=lambda x : str(x)) 131 dbvalues.sort(key=lambda x : str(x)) 132 self.assertEqual(repr(values), repr(dbvalues)) 133 134 items = d.items() 135 self.assertEqual(len(items), len(values)) 136 137 for key, value in items: 138 self.checkrec(key, value) 139 140 self.assertEqual(d.get(self.mk('bad key')), None) 141 self.assertEqual(d.get(self.mk('bad key'), None), None) 142 self.assertEqual(d.get(self.mk('bad key'), 'a string'), 'a string') 143 self.assertEqual(d.get(self.mk('bad key'), [1, 2, 3]), [1, 2, 3]) 144 145 d.set_get_returns_none(0) 146 self.assertRaises(db.DBNotFoundError, d.get, self.mk('bad key')) 147 d.set_get_returns_none(1) 148 149 d.put(self.mk('new key'), 'new data') 150 self.assertEqual(d.get(self.mk('new key')), 'new data') 151 self.assertEqual(d[self.mk('new key')], 'new data') 152 153 154 155 def test02_cursors(self): 156 if verbose: 157 print '\n', '-=' * 30 158 print "Running %s.test02_cursors..." % self.__class__.__name__ 159 160 self.populateDB(self.d) 161 d = self.d 162 163 count = 0 164 c = d.cursor() 165 rec = c.first() 166 while rec is not None: 167 count = count + 1 168 if verbose: 169 print rec 170 key, value = rec 171 self.checkrec(key, value) 172 # Hack to avoid conversion by 2to3 tool 173 rec = getattr(c, "next")() 174 del c 175 176 self.assertEqual(count, len(d)) 177 178 count = 0 179 c = d.cursor() 180 rec = c.last() 181 while rec is not None: 182 count = count + 1 183 if verbose: 184 print rec 185 key, value = rec 186 self.checkrec(key, value) 187 rec = c.prev() 188 189 self.assertEqual(count, len(d)) 190 191 c.set(self.mk('SS')) 192 key, value = c.current() 193 self.checkrec(key, value) 194 del c 195 196 197 def test03_append(self): 198 # NOTE: this is overridden in RECNO subclass, don't change its name. 199 if verbose: 200 print '\n', '-=' * 30 201 print "Running %s.test03_append..." % self.__class__.__name__ 202 203 self.assertRaises(dbshelve.DBShelveError, 204 self.d.append, 'unit test was here') 205 206 207 def test04_iterable(self) : 208 self.populateDB(self.d) 209 d = self.d 210 keys = d.keys() 211 keyset = set(keys) 212 self.assertEqual(len(keyset), len(keys)) 213 214 for key in d : 215 self.assertIn(key, keyset) 216 keyset.remove(key) 217 self.assertEqual(len(keyset), 0) 218 219 def checkrec(self, key, value): 220 # override this in a subclass if the key type is different 221 222 if sys.version_info[0] >= 3 : 223 if isinstance(key, bytes) : 224 key = key.decode("iso8859-1") # 8 bits 225 226 x = key[1] 227 if key[0] == 'S': 228 self.assertEqual(type(value), str) 229 self.assertEqual(value, 10 * x) 230 231 elif key[0] == 'I': 232 self.assertEqual(type(value), int) 233 self.assertEqual(value, ord(x)) 234 235 elif key[0] == 'L': 236 self.assertEqual(type(value), list) 237 self.assertEqual(value, [x] * 10) 238 239 elif key[0] == 'O': 240 if sys.version_info[0] < 3 : 241 from types import InstanceType 242 self.assertEqual(type(value), InstanceType) 243 else : 244 self.assertEqual(type(value), DataClass) 245 246 self.assertEqual(value.S, 10 * x) 247 self.assertEqual(value.I, ord(x)) 248 self.assertEqual(value.L, [x] * 10) 249 250 else: 251 self.assertTrue(0, 'Unknown key type, fix the test') 252 253 #---------------------------------------------------------------------- 254 255 class BasicShelveTestCase(DBShelveTestCase): 256 def do_open(self): 257 self.d = dbshelve.DBShelf() 258 self.d.open(self.filename, self.dbtype, self.dbflags) 259 260 def do_close(self): 261 self.d.close() 262 263 264 class BTreeShelveTestCase(BasicShelveTestCase): 265 dbtype = db.DB_BTREE 266 dbflags = db.DB_CREATE 267 268 269 class HashShelveTestCase(BasicShelveTestCase): 270 dbtype = db.DB_HASH 271 dbflags = db.DB_CREATE 272 273 274 class ThreadBTreeShelveTestCase(BasicShelveTestCase): 275 dbtype = db.DB_BTREE 276 dbflags = db.DB_CREATE | db.DB_THREAD 277 278 279 class ThreadHashShelveTestCase(BasicShelveTestCase): 280 dbtype = db.DB_HASH 281 dbflags = db.DB_CREATE | db.DB_THREAD 282 283 284 #---------------------------------------------------------------------- 285 286 class BasicEnvShelveTestCase(DBShelveTestCase): 287 def do_open(self): 288 self.env = db.DBEnv() 289 self.env.open(self.homeDir, 290 self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE) 291 292 self.filename = os.path.split(self.filename)[1] 293 self.d = dbshelve.DBShelf(self.env) 294 self.d.open(self.filename, self.dbtype, self.dbflags) 295 296 297 def do_close(self): 298 self.d.close() 299 self.env.close() 300 301 302 def setUp(self) : 303 self.homeDir = get_new_environment_path() 304 DBShelveTestCase.setUp(self) 305 306 def tearDown(self): 307 if sys.version_info[0] >= 3 : 308 from test_all import do_proxy_db_py3k 309 do_proxy_db_py3k(self._flag_proxy_db_py3k) 310 self.do_close() 311 test_support.rmtree(self.homeDir) 312 313 314 class EnvBTreeShelveTestCase(BasicEnvShelveTestCase): 315 envflags = 0 316 dbtype = db.DB_BTREE 317 dbflags = db.DB_CREATE 318 319 320 class EnvHashShelveTestCase(BasicEnvShelveTestCase): 321 envflags = 0 322 dbtype = db.DB_HASH 323 dbflags = db.DB_CREATE 324 325 326 class EnvThreadBTreeShelveTestCase(BasicEnvShelveTestCase): 327 envflags = db.DB_THREAD 328 dbtype = db.DB_BTREE 329 dbflags = db.DB_CREATE | db.DB_THREAD 330 331 332 class EnvThreadHashShelveTestCase(BasicEnvShelveTestCase): 333 envflags = db.DB_THREAD 334 dbtype = db.DB_HASH 335 dbflags = db.DB_CREATE | db.DB_THREAD 336 337 338 #---------------------------------------------------------------------- 339 # test cases for a DBShelf in a RECNO DB. 340 341 class RecNoShelveTestCase(BasicShelveTestCase): 342 dbtype = db.DB_RECNO 343 dbflags = db.DB_CREATE 344 345 def setUp(self): 346 BasicShelveTestCase.setUp(self) 347 348 # pool to assign integer key values out of 349 self.key_pool = list(range(1, 5000)) 350 self.key_map = {} # map string keys to the number we gave them 351 self.intkey_map = {} # reverse map of above 352 353 def mk(self, key): 354 if key not in self.key_map: 355 self.key_map[key] = self.key_pool.pop(0) 356 self.intkey_map[self.key_map[key]] = key 357 return self.key_map[key] 358 359 def checkrec(self, intkey, value): 360 key = self.intkey_map[intkey] 361 BasicShelveTestCase.checkrec(self, key, value) 362 363 def test03_append(self): 364 if verbose: 365 print '\n', '-=' * 30 366 print "Running %s.test03_append..." % self.__class__.__name__ 367 368 self.d[1] = 'spam' 369 self.d[5] = 'eggs' 370 self.assertEqual(6, self.d.append('spam')) 371 self.assertEqual(7, self.d.append('baked beans')) 372 self.assertEqual('spam', self.d.get(6)) 373 self.assertEqual('spam', self.d.get(1)) 374 self.assertEqual('baked beans', self.d.get(7)) 375 self.assertEqual('eggs', self.d.get(5)) 376 377 378 #---------------------------------------------------------------------- 379 380 def test_suite(): 381 suite = unittest.TestSuite() 382 383 suite.addTest(unittest.makeSuite(DBShelveTestCase)) 384 suite.addTest(unittest.makeSuite(BTreeShelveTestCase)) 385 suite.addTest(unittest.makeSuite(HashShelveTestCase)) 386 suite.addTest(unittest.makeSuite(ThreadBTreeShelveTestCase)) 387 suite.addTest(unittest.makeSuite(ThreadHashShelveTestCase)) 388 suite.addTest(unittest.makeSuite(EnvBTreeShelveTestCase)) 389 suite.addTest(unittest.makeSuite(EnvHashShelveTestCase)) 390 suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase)) 391 suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase)) 392 suite.addTest(unittest.makeSuite(RecNoShelveTestCase)) 393 394 return suite 395 396 397 if __name__ == '__main__': 398 unittest.main(defaultTest='test_suite') 399