1 #!/usr/bin/env python 2 # 3 #----------------------------------------------------------------------- 4 # A test suite for the table interface built on bsddb.db 5 #----------------------------------------------------------------------- 6 # 7 # Copyright (C) 2000, 2001 by Autonomous Zone Industries 8 # Copyright (C) 2002 Gregory P. Smith 9 # 10 # March 20, 2000 11 # 12 # License: This is free software. You may use this software for any 13 # purpose including modification/redistribution, so long as 14 # this header remains intact and that you do not claim any 15 # rights of ownership or authorship of this software. This 16 # software has been tested, but no warranty is expressed or 17 # implied. 18 # 19 # -- Gregory P. Smith <greg (at] krypto.org> 20 # 21 # $Id$ 22 23 import os, re, sys 24 25 if sys.version_info[0] < 3 : 26 try: 27 import cPickle 28 pickle = cPickle 29 except ImportError: 30 import pickle 31 else : 32 import pickle 33 34 import unittest 35 from test_all import db, dbtables, test_support, verbose, \ 36 get_new_environment_path, get_new_database_path 37 38 #---------------------------------------------------------------------- 39 40 class TableDBTestCase(unittest.TestCase): 41 db_name = 'test-table.db' 42 43 def setUp(self): 44 import sys 45 if sys.version_info[0] >= 3 : 46 from test_all import do_proxy_db_py3k 47 self._flag_proxy_db_py3k = do_proxy_db_py3k(False) 48 49 self.testHomeDir = get_new_environment_path() 50 self.tdb = dbtables.bsdTableDB( 51 filename='tabletest.db', dbhome=self.testHomeDir, create=1) 52 53 def tearDown(self): 54 self.tdb.close() 55 import sys 56 if sys.version_info[0] >= 3 : 57 from test_all import do_proxy_db_py3k 58 do_proxy_db_py3k(self._flag_proxy_db_py3k) 59 test_support.rmtree(self.testHomeDir) 60 61 def test01(self): 62 tabname = "test01" 63 colname = 'cool numbers' 64 try: 65 self.tdb.Drop(tabname) 66 except dbtables.TableDBError: 67 pass 68 self.tdb.CreateTable(tabname, [colname]) 69 import sys 70 if sys.version_info[0] < 3 : 71 self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 1)}) 72 else : 73 self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 74 1).decode("iso8859-1")}) # 8 bits 75 76 if verbose: 77 self.tdb._db_print() 78 79 values = self.tdb.Select( 80 tabname, [colname], conditions={colname: None}) 81 82 import sys 83 if sys.version_info[0] < 3 : 84 colval = pickle.loads(values[0][colname]) 85 else : 86 colval = pickle.loads(bytes(values[0][colname], "iso8859-1")) 87 self.assertTrue(colval > 3.141) 88 self.assertTrue(colval < 3.142) 89 90 91 def test02(self): 92 tabname = "test02" 93 col0 = 'coolness factor' 94 col1 = 'but can it fly?' 95 col2 = 'Species' 96 97 import sys 98 if sys.version_info[0] < 3 : 99 testinfo = [ 100 {col0: pickle.dumps(8, 1), col1: 'no', col2: 'Penguin'}, 101 {col0: pickle.dumps(-1, 1), col1: 'no', col2: 'Turkey'}, 102 {col0: pickle.dumps(9, 1), col1: 'yes', col2: 'SR-71A Blackbird'} 103 ] 104 else : 105 testinfo = [ 106 {col0: pickle.dumps(8, 1).decode("iso8859-1"), 107 col1: 'no', col2: 'Penguin'}, 108 {col0: pickle.dumps(-1, 1).decode("iso8859-1"), 109 col1: 'no', col2: 'Turkey'}, 110 {col0: pickle.dumps(9, 1).decode("iso8859-1"), 111 col1: 'yes', col2: 'SR-71A Blackbird'} 112 ] 113 114 try: 115 self.tdb.Drop(tabname) 116 except dbtables.TableDBError: 117 pass 118 self.tdb.CreateTable(tabname, [col0, col1, col2]) 119 for row in testinfo : 120 self.tdb.Insert(tabname, row) 121 122 import sys 123 if sys.version_info[0] < 3 : 124 values = self.tdb.Select(tabname, [col2], 125 conditions={col0: lambda x: pickle.loads(x) >= 8}) 126 else : 127 values = self.tdb.Select(tabname, [col2], 128 conditions={col0: lambda x: 129 pickle.loads(bytes(x, "iso8859-1")) >= 8}) 130 131 self.assertEqual(len(values), 2) 132 if values[0]['Species'] == 'Penguin' : 133 self.assertEqual(values[1]['Species'], 'SR-71A Blackbird') 134 elif values[0]['Species'] == 'SR-71A Blackbird' : 135 self.assertEqual(values[1]['Species'], 'Penguin') 136 else : 137 if verbose: 138 print "values= %r" % (values,) 139 raise RuntimeError("Wrong values returned!") 140 141 def test03(self): 142 tabname = "test03" 143 try: 144 self.tdb.Drop(tabname) 145 except dbtables.TableDBError: 146 pass 147 if verbose: 148 print '...before CreateTable...' 149 self.tdb._db_print() 150 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e']) 151 if verbose: 152 print '...after CreateTable...' 153 self.tdb._db_print() 154 self.tdb.Drop(tabname) 155 if verbose: 156 print '...after Drop...' 157 self.tdb._db_print() 158 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e']) 159 160 try: 161 self.tdb.Insert(tabname, 162 {'a': "", 163 'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1), 164 'f': "Zero"}) 165 self.fail('Expected an exception') 166 except dbtables.TableDBError: 167 pass 168 169 try: 170 self.tdb.Select(tabname, [], conditions={'foo': '123'}) 171 self.fail('Expected an exception') 172 except dbtables.TableDBError: 173 pass 174 175 self.tdb.Insert(tabname, 176 {'a': '42', 177 'b': "bad", 178 'c': "meep", 179 'e': 'Fuzzy wuzzy was a bear'}) 180 self.tdb.Insert(tabname, 181 {'a': '581750', 182 'b': "good", 183 'd': "bla", 184 'c': "black", 185 'e': 'fuzzy was here'}) 186 self.tdb.Insert(tabname, 187 {'a': '800000', 188 'b': "good", 189 'd': "bla", 190 'c': "black", 191 'e': 'Fuzzy wuzzy is a bear'}) 192 193 if verbose: 194 self.tdb._db_print() 195 196 # this should return two rows 197 values = self.tdb.Select(tabname, ['b', 'a', 'd'], 198 conditions={'e': re.compile('wuzzy').search, 199 'a': re.compile('^[0-9]+$').match}) 200 self.assertEqual(len(values), 2) 201 202 # now lets delete one of them and try again 203 self.tdb.Delete(tabname, conditions={'b': dbtables.ExactCond('good')}) 204 values = self.tdb.Select( 205 tabname, ['a', 'd', 'b'], 206 conditions={'e': dbtables.PrefixCond('Fuzzy')}) 207 self.assertEqual(len(values), 1) 208 self.assertEqual(values[0]['d'], None) 209 210 values = self.tdb.Select(tabname, ['b'], 211 conditions={'c': lambda c: c == 'meep'}) 212 self.assertEqual(len(values), 1) 213 self.assertEqual(values[0]['b'], "bad") 214 215 216 def test04_MultiCondSelect(self): 217 tabname = "test04_MultiCondSelect" 218 try: 219 self.tdb.Drop(tabname) 220 except dbtables.TableDBError: 221 pass 222 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e']) 223 224 try: 225 self.tdb.Insert(tabname, 226 {'a': "", 227 'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1), 228 'f': "Zero"}) 229 self.fail('Expected an exception') 230 except dbtables.TableDBError: 231 pass 232 233 self.tdb.Insert(tabname, {'a': "A", 'b': "B", 'c': "C", 'd': "D", 234 'e': "E"}) 235 self.tdb.Insert(tabname, {'a': "-A", 'b': "-B", 'c': "-C", 'd': "-D", 236 'e': "-E"}) 237 self.tdb.Insert(tabname, {'a': "A-", 'b': "B-", 'c': "C-", 'd': "D-", 238 'e': "E-"}) 239 240 if verbose: 241 self.tdb._db_print() 242 243 # This select should return 0 rows. it is designed to test 244 # the bug identified and fixed in sourceforge bug # 590449 245 # (Big Thanks to "Rob Tillotson (n9mtb)" for tracking this down 246 # and supplying a fix!! This one caused many headaches to say 247 # the least...) 248 values = self.tdb.Select(tabname, ['b', 'a', 'd'], 249 conditions={'e': dbtables.ExactCond('E'), 250 'a': dbtables.ExactCond('A'), 251 'd': dbtables.PrefixCond('-') 252 } ) 253 self.assertEqual(len(values), 0, values) 254 255 256 def test_CreateOrExtend(self): 257 tabname = "test_CreateOrExtend" 258 259 self.tdb.CreateOrExtendTable( 260 tabname, ['name', 'taste', 'filling', 'alcohol content', 'price']) 261 try: 262 self.tdb.Insert(tabname, 263 {'taste': 'crap', 264 'filling': 'no', 265 'is it Guinness?': 'no'}) 266 self.fail("Insert should've failed due to bad column name") 267 except: 268 pass 269 self.tdb.CreateOrExtendTable(tabname, 270 ['name', 'taste', 'is it Guinness?']) 271 272 # these should both succeed as the table should contain the union of both sets of columns. 273 self.tdb.Insert(tabname, {'taste': 'crap', 'filling': 'no', 274 'is it Guinness?': 'no'}) 275 self.tdb.Insert(tabname, {'taste': 'great', 'filling': 'yes', 276 'is it Guinness?': 'yes', 277 'name': 'Guinness'}) 278 279 280 def test_CondObjs(self): 281 tabname = "test_CondObjs" 282 283 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e', 'p']) 284 285 self.tdb.Insert(tabname, {'a': "the letter A", 286 'b': "the letter B", 287 'c': "is for cookie"}) 288 self.tdb.Insert(tabname, {'a': "is for aardvark", 289 'e': "the letter E", 290 'c': "is for cookie", 291 'd': "is for dog"}) 292 self.tdb.Insert(tabname, {'a': "the letter A", 293 'e': "the letter E", 294 'c': "is for cookie", 295 'p': "is for Python"}) 296 297 values = self.tdb.Select( 298 tabname, ['p', 'e'], 299 conditions={'e': dbtables.PrefixCond('the l')}) 300 self.assertEqual(len(values), 2, values) 301 self.assertEqual(values[0]['e'], values[1]['e'], values) 302 self.assertNotEqual(values[0]['p'], values[1]['p'], values) 303 304 values = self.tdb.Select( 305 tabname, ['d', 'a'], 306 conditions={'a': dbtables.LikeCond('%aardvark%')}) 307 self.assertEqual(len(values), 1, values) 308 self.assertEqual(values[0]['d'], "is for dog", values) 309 self.assertEqual(values[0]['a'], "is for aardvark", values) 310 311 values = self.tdb.Select(tabname, None, 312 {'b': dbtables.Cond(), 313 'e':dbtables.LikeCond('%letter%'), 314 'a':dbtables.PrefixCond('is'), 315 'd':dbtables.ExactCond('is for dog'), 316 'c':dbtables.PrefixCond('is for'), 317 'p':lambda s: not s}) 318 self.assertEqual(len(values), 1, values) 319 self.assertEqual(values[0]['d'], "is for dog", values) 320 self.assertEqual(values[0]['a'], "is for aardvark", values) 321 322 def test_Delete(self): 323 tabname = "test_Delete" 324 self.tdb.CreateTable(tabname, ['x', 'y', 'z']) 325 326 # prior to 2001-05-09 there was a bug where Delete() would 327 # fail if it encountered any rows that did not have values in 328 # every column. 329 # Hunted and Squashed by <Donwulff> (Jukka Santala - donwulff (at] nic.fi) 330 self.tdb.Insert(tabname, {'x': 'X1', 'y':'Y1'}) 331 self.tdb.Insert(tabname, {'x': 'X2', 'y':'Y2', 'z': 'Z2'}) 332 333 self.tdb.Delete(tabname, conditions={'x': dbtables.PrefixCond('X')}) 334 values = self.tdb.Select(tabname, ['y'], 335 conditions={'x': dbtables.PrefixCond('X')}) 336 self.assertEqual(len(values), 0) 337 338 def test_Modify(self): 339 tabname = "test_Modify" 340 self.tdb.CreateTable(tabname, ['Name', 'Type', 'Access']) 341 342 self.tdb.Insert(tabname, {'Name': 'Index to MP3 files.doc', 343 'Type': 'Word', 'Access': '8'}) 344 self.tdb.Insert(tabname, {'Name': 'Nifty.MP3', 'Access': '1'}) 345 self.tdb.Insert(tabname, {'Type': 'Unknown', 'Access': '0'}) 346 347 def set_type(type): 348 if type is None: 349 return 'MP3' 350 return type 351 352 def increment_access(count): 353 return str(int(count)+1) 354 355 def remove_value(value): 356 return None 357 358 self.tdb.Modify(tabname, 359 conditions={'Access': dbtables.ExactCond('0')}, 360 mappings={'Access': remove_value}) 361 self.tdb.Modify(tabname, 362 conditions={'Name': dbtables.LikeCond('%MP3%')}, 363 mappings={'Type': set_type}) 364 self.tdb.Modify(tabname, 365 conditions={'Name': dbtables.LikeCond('%')}, 366 mappings={'Access': increment_access}) 367 368 try: 369 self.tdb.Modify(tabname, 370 conditions={'Name': dbtables.LikeCond('%')}, 371 mappings={'Access': 'What is your quest?'}) 372 except TypeError: 373 # success, the string value in mappings isn't callable 374 pass 375 else: 376 raise RuntimeError, "why was TypeError not raised for bad callable?" 377 378 # Delete key in select conditions 379 values = self.tdb.Select( 380 tabname, None, 381 conditions={'Type': dbtables.ExactCond('Unknown')}) 382 self.assertEqual(len(values), 1, values) 383 self.assertEqual(values[0]['Name'], None, values) 384 self.assertEqual(values[0]['Access'], None, values) 385 386 # Modify value by select conditions 387 values = self.tdb.Select( 388 tabname, None, 389 conditions={'Name': dbtables.ExactCond('Nifty.MP3')}) 390 self.assertEqual(len(values), 1, values) 391 self.assertEqual(values[0]['Type'], "MP3", values) 392 self.assertEqual(values[0]['Access'], "2", values) 393 394 # Make sure change applied only to select conditions 395 values = self.tdb.Select( 396 tabname, None, conditions={'Name': dbtables.LikeCond('%doc%')}) 397 self.assertEqual(len(values), 1, values) 398 self.assertEqual(values[0]['Type'], "Word", values) 399 self.assertEqual(values[0]['Access'], "9", values) 400 401 402 def test_suite(): 403 suite = unittest.TestSuite() 404 suite.addTest(unittest.makeSuite(TableDBTestCase)) 405 return suite 406 407 408 if __name__ == '__main__': 409 unittest.main(defaultTest='test_suite') 410