1 """TestCases for exercising a Recno DB. 2 """ 3 4 import os, sys 5 import errno 6 from pprint import pprint 7 import unittest 8 9 from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path 10 11 letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' 12 13 14 #---------------------------------------------------------------------- 15 16 class SimpleRecnoTestCase(unittest.TestCase): 17 if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and 18 (sys.version_info < (3, 2))) : 19 def assertIsInstance(self, obj, datatype, msg=None) : 20 return self.assertEqual(type(obj), datatype, msg=msg) 21 def assertGreaterEqual(self, a, b, msg=None) : 22 return self.assertTrue(a>=b, msg=msg) 23 24 25 def setUp(self): 26 self.filename = get_new_database_path() 27 self.homeDir = None 28 29 def tearDown(self): 30 test_support.unlink(self.filename) 31 if self.homeDir: 32 test_support.rmtree(self.homeDir) 33 34 def test01_basic(self): 35 d = db.DB() 36 37 get_returns_none = d.set_get_returns_none(2) 38 d.set_get_returns_none(get_returns_none) 39 40 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) 41 42 for x in letters: 43 recno = d.append(x * 60) 44 self.assertIsInstance(recno, int) 45 self.assertGreaterEqual(recno, 1) 46 if verbose: 47 print recno, 48 49 if verbose: print 50 51 stat = d.stat() 52 if verbose: 53 pprint(stat) 54 55 for recno in range(1, len(d)+1): 56 data = d[recno] 57 if verbose: 58 print data 59 60 self.assertIsInstance(data, str) 61 self.assertEqual(data, d.get(recno)) 62 63 try: 64 data = d[0] # This should raise a KeyError!?!?! 65 except db.DBInvalidArgError, val: 66 if sys.version_info < (2, 6) : 67 self.assertEqual(val[0], db.EINVAL) 68 else : 69 self.assertEqual(val.args[0], db.EINVAL) 70 if verbose: print val 71 else: 72 self.fail("expected exception") 73 74 # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2) 75 try: 76 d.has_key(0) 77 except db.DBError, val: 78 pass 79 else: 80 self.fail("has_key did not raise a proper exception") 81 82 try: 83 data = d[100] 84 except KeyError: 85 pass 86 else: 87 self.fail("expected exception") 88 89 try: 90 data = d.get(100) 91 except db.DBNotFoundError, val: 92 if get_returns_none: 93 self.fail("unexpected exception") 94 else: 95 self.assertEqual(data, None) 96 97 keys = d.keys() 98 if verbose: 99 print keys 100 self.assertIsInstance(keys, list) 101 self.assertIsInstance(keys[0], int) 102 self.assertEqual(len(keys), len(d)) 103 104 items = d.items() 105 if verbose: 106 pprint(items) 107 self.assertIsInstance(items, list) 108 self.assertIsInstance(items[0], tuple) 109 self.assertEqual(len(items[0]), 2) 110 self.assertIsInstance(items[0][0], int) 111 self.assertIsInstance(items[0][1], str) 112 self.assertEqual(len(items), len(d)) 113 114 self.assertTrue(d.has_key(25)) 115 116 del d[25] 117 self.assertFalse(d.has_key(25)) 118 119 d.delete(13) 120 self.assertFalse(d.has_key(13)) 121 122 data = d.get_both(26, "z" * 60) 123 self.assertEqual(data, "z" * 60, 'was %r' % data) 124 if verbose: 125 print data 126 127 fd = d.fd() 128 if verbose: 129 print fd 130 131 c = d.cursor() 132 rec = c.first() 133 while rec: 134 if verbose: 135 print rec 136 rec = c.next() 137 138 c.set(50) 139 rec = c.current() 140 if verbose: 141 print rec 142 143 c.put(-1, "a replacement record", db.DB_CURRENT) 144 145 c.set(50) 146 rec = c.current() 147 self.assertEqual(rec, (50, "a replacement record")) 148 if verbose: 149 print rec 150 151 rec = c.set_range(30) 152 if verbose: 153 print rec 154 155 # test that non-existent key lookups work (and that 156 # DBC_set_range doesn't have a memleak under valgrind) 157 rec = c.set_range(999999) 158 self.assertEqual(rec, None) 159 if verbose: 160 print rec 161 162 c.close() 163 d.close() 164 165 d = db.DB() 166 d.open(self.filename) 167 c = d.cursor() 168 169 # put a record beyond the consecutive end of the recno's 170 d[100] = "way out there" 171 self.assertEqual(d[100], "way out there") 172 173 try: 174 data = d[99] 175 except KeyError: 176 pass 177 else: 178 self.fail("expected exception") 179 180 try: 181 d.get(99) 182 except db.DBKeyEmptyError, val: 183 if get_returns_none: 184 self.fail("unexpected DBKeyEmptyError exception") 185 else: 186 if sys.version_info < (2, 6) : 187 self.assertEqual(val[0], db.DB_KEYEMPTY) 188 else : 189 self.assertEqual(val.args[0], db.DB_KEYEMPTY) 190 if verbose: print val 191 else: 192 if not get_returns_none: 193 self.fail("expected exception") 194 195 rec = c.set(40) 196 while rec: 197 if verbose: 198 print rec 199 rec = c.next() 200 201 c.close() 202 d.close() 203 204 def test02_WithSource(self): 205 """ 206 A Recno file that is given a "backing source file" is essentially a 207 simple ASCII file. Normally each record is delimited by \n and so is 208 just a line in the file, but you can set a different record delimiter 209 if needed. 210 """ 211 homeDir = get_new_environment_path() 212 self.homeDir = homeDir 213 source = os.path.join(homeDir, 'test_recno.txt') 214 if not os.path.isdir(homeDir): 215 os.mkdir(homeDir) 216 f = open(source, 'w') # create the file 217 f.close() 218 219 d = db.DB() 220 # This is the default value, just checking if both int 221 d.set_re_delim(0x0A) 222 d.set_re_delim('\n') # and char can be used... 223 d.set_re_source(source) 224 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) 225 226 data = "The quick brown fox jumped over the lazy dog".split() 227 for datum in data: 228 d.append(datum) 229 d.sync() 230 d.close() 231 232 # get the text from the backing source 233 f = open(source, 'r') 234 text = f.read() 235 f.close() 236 text = text.strip() 237 if verbose: 238 print text 239 print data 240 print text.split('\n') 241 242 self.assertEqual(text.split('\n'), data) 243 244 # open as a DB again 245 d = db.DB() 246 d.set_re_source(source) 247 d.open(self.filename, db.DB_RECNO) 248 249 d[3] = 'reddish-brown' 250 d[8] = 'comatose' 251 252 d.sync() 253 d.close() 254 255 f = open(source, 'r') 256 text = f.read() 257 f.close() 258 text = text.strip() 259 if verbose: 260 print text 261 print text.split('\n') 262 263 self.assertEqual(text.split('\n'), 264 "The quick reddish-brown fox jumped over the comatose dog".split()) 265 266 def test03_FixedLength(self): 267 d = db.DB() 268 d.set_re_len(40) # fixed length records, 40 bytes long 269 d.set_re_pad('-') # sets the pad character... 270 d.set_re_pad(45) # ...test both int and char 271 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) 272 273 for x in letters: 274 d.append(x * 35) # These will be padded 275 276 d.append('.' * 40) # this one will be exact 277 278 try: # this one will fail 279 d.append('bad' * 20) 280 except db.DBInvalidArgError, val: 281 if sys.version_info < (2, 6) : 282 self.assertEqual(val[0], db.EINVAL) 283 else : 284 self.assertEqual(val.args[0], db.EINVAL) 285 if verbose: print val 286 else: 287 self.fail("expected exception") 288 289 c = d.cursor() 290 rec = c.first() 291 while rec: 292 if verbose: 293 print rec 294 rec = c.next() 295 296 c.close() 297 d.close() 298 299 def test04_get_size_empty(self) : 300 d = db.DB() 301 d.open(self.filename, dbtype=db.DB_RECNO, flags=db.DB_CREATE) 302 303 row_id = d.append(' ') 304 self.assertEqual(1, d.get_size(key=row_id)) 305 row_id = d.append('') 306 self.assertEqual(0, d.get_size(key=row_id)) 307 308 309 310 311 312 #---------------------------------------------------------------------- 313 314 315 def test_suite(): 316 return unittest.makeSuite(SimpleRecnoTestCase) 317 318 319 if __name__ == '__main__': 320 unittest.main(defaultTest='test_suite') 321