Home | History | Annotate | Download | only in test
      1 """Miscellaneous bsddb module test cases
      2 """
      3 
      4 import os, sys
      5 import unittest
      6 
      7 from test_all import db, dbshelve, hashopen, test_support, get_new_environment_path, get_new_database_path
      8 
      9 #----------------------------------------------------------------------
     10 
     11 class MiscTestCase(unittest.TestCase):
     12     def setUp(self):
     13         self.filename = get_new_database_path()
     14         self.homeDir = get_new_environment_path()
     15 
     16     def tearDown(self):
     17         test_support.unlink(self.filename)
     18         test_support.rmtree(self.homeDir)
     19 
     20     def test01_badpointer(self):
     21         dbs = dbshelve.open(self.filename)
     22         dbs.close()
     23         self.assertRaises(db.DBError, dbs.get, "foo")
     24 
     25     def test02_db_home(self):
     26         env = db.DBEnv()
     27         # check for crash fixed when db_home is used before open()
     28         self.assertTrue(env.db_home is None)
     29         env.open(self.homeDir, db.DB_CREATE)
     30         if sys.version_info[0] < 3 :
     31             self.assertEqual(self.homeDir, env.db_home)
     32         else :
     33             self.assertEqual(bytes(self.homeDir, "ascii"), env.db_home)
     34 
     35     def test03_repr_closed_db(self):
     36         db = hashopen(self.filename)
     37         db.close()
     38         rp = repr(db)
     39         self.assertEqual(rp, "{}")
     40 
     41     def test04_repr_db(self) :
     42         db = hashopen(self.filename)
     43         d = {}
     44         for i in xrange(100) :
     45             db[repr(i)] = repr(100*i)
     46             d[repr(i)] = repr(100*i)
     47         db.close()
     48         db = hashopen(self.filename)
     49         rp = repr(db)
     50         self.assertEqual(rp, repr(d))
     51         db.close()
     52 
     53     # http://sourceforge.net/tracker/index.php?func=detail&aid=1708868&group_id=13900&atid=313900
     54     #
     55     # See the bug report for details.
     56     #
     57     # The problem was that make_key_dbt() was not allocating a copy of
     58     # string keys but FREE_DBT() was always being told to free it when the
     59     # database was opened with DB_THREAD.
     60     def test05_double_free_make_key_dbt(self):
     61         try:
     62             db1 = db.DB()
     63             db1.open(self.filename, None, db.DB_BTREE,
     64                      db.DB_CREATE | db.DB_THREAD)
     65 
     66             curs = db1.cursor()
     67             t = curs.get("/foo", db.DB_SET)
     68             # double free happened during exit from DBC_get
     69         finally:
     70             db1.close()
     71             test_support.unlink(self.filename)
     72 
     73     def test06_key_with_null_bytes(self):
     74         try:
     75             db1 = db.DB()
     76             db1.open(self.filename, None, db.DB_HASH, db.DB_CREATE)
     77             db1['a'] = 'eh?'
     78             db1['a\x00'] = 'eh zed.'
     79             db1['a\x00a'] = 'eh zed eh?'
     80             db1['aaa'] = 'eh eh eh!'
     81             keys = db1.keys()
     82             keys.sort()
     83             self.assertEqual(['a', 'a\x00', 'a\x00a', 'aaa'], keys)
     84             self.assertEqual(db1['a'], 'eh?')
     85             self.assertEqual(db1['a\x00'], 'eh zed.')
     86             self.assertEqual(db1['a\x00a'], 'eh zed eh?')
     87             self.assertEqual(db1['aaa'], 'eh eh eh!')
     88         finally:
     89             db1.close()
     90             test_support.unlink(self.filename)
     91 
     92     def test07_DB_set_flags_persists(self):
     93         try:
     94             db1 = db.DB()
     95             db1.set_flags(db.DB_DUPSORT)
     96             db1.open(self.filename, db.DB_HASH, db.DB_CREATE)
     97             db1['a'] = 'eh'
     98             db1['a'] = 'A'
     99             self.assertEqual([('a', 'A')], db1.items())
    100             db1.put('a', 'Aa')
    101             self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items())
    102             db1.close()
    103             db1 = db.DB()
    104             # no set_flags call, we're testing that it reads and obeys
    105             # the flags on open.
    106             db1.open(self.filename, db.DB_HASH)
    107             self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items())
    108             # if it read the flags right this will replace all values
    109             # for key 'a' instead of adding a new one.  (as a dict should)
    110             db1['a'] = 'new A'
    111             self.assertEqual([('a', 'new A')], db1.items())
    112         finally:
    113             db1.close()
    114             test_support.unlink(self.filename)
    115 
    116 
    117     def test08_ExceptionTypes(self) :
    118         self.assertTrue(issubclass(db.DBError, Exception))
    119         for i, j in db.__dict__.items() :
    120             if i.startswith("DB") and i.endswith("Error") :
    121                 self.assertTrue(issubclass(j, db.DBError), msg=i)
    122                 if i not in ("DBKeyEmptyError", "DBNotFoundError") :
    123                     self.assertFalse(issubclass(j, KeyError), msg=i)
    124 
    125         # This two exceptions have two bases
    126         self.assertTrue(issubclass(db.DBKeyEmptyError, KeyError))
    127         self.assertTrue(issubclass(db.DBNotFoundError, KeyError))
    128 
    129 
    130 #----------------------------------------------------------------------
    131 
    132 
    133 def test_suite():
    134     return unittest.makeSuite(MiscTestCase)
    135 
    136 
    137 if __name__ == '__main__':
    138     unittest.main(defaultTest='test_suite')
    139