Home | History | Annotate | Download | only in test
      1 import unittest
      2 import os
      3 
      4 from test_all import db, test_support, get_new_environment_path, get_new_database_path
      5 
      6 
      7 class DBSequenceTest(unittest.TestCase):
      8     def setUp(self):
      9         self.int_32_max = 0x100000000
     10         self.homeDir = get_new_environment_path()
     11         self.filename = "test"
     12 
     13         self.dbenv = db.DBEnv()
     14         self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666)
     15         self.d = db.DB(self.dbenv)
     16         self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666)
     17 
     18     def tearDown(self):
     19         if hasattr(self, 'seq'):
     20             self.seq.close()
     21             del self.seq
     22         if hasattr(self, 'd'):
     23             self.d.close()
     24             del self.d
     25         if hasattr(self, 'dbenv'):
     26             self.dbenv.close()
     27             del self.dbenv
     28 
     29         test_support.rmtree(self.homeDir)
     30 
     31     def test_get(self):
     32         self.seq = db.DBSequence(self.d, flags=0)
     33         start_value = 10 * self.int_32_max
     34         self.assertEqual(0xA00000000, start_value)
     35         self.assertEqual(None, self.seq.initial_value(start_value))
     36         self.assertEqual(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
     37         self.assertEqual(start_value, self.seq.get(5))
     38         self.assertEqual(start_value + 5, self.seq.get())
     39 
     40     def test_remove(self):
     41         self.seq = db.DBSequence(self.d, flags=0)
     42         self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
     43         self.assertEqual(None, self.seq.remove(txn=None, flags=0))
     44         del self.seq
     45 
     46     def test_get_key(self):
     47         self.seq = db.DBSequence(self.d, flags=0)
     48         key = 'foo'
     49         self.assertEqual(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
     50         self.assertEqual(key, self.seq.get_key())
     51 
     52     def test_get_dbp(self):
     53         self.seq = db.DBSequence(self.d, flags=0)
     54         self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
     55         self.assertEqual(self.d, self.seq.get_dbp())
     56 
     57     def test_cachesize(self):
     58         self.seq = db.DBSequence(self.d, flags=0)
     59         cashe_size = 10
     60         self.assertEqual(None, self.seq.set_cachesize(cashe_size))
     61         self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
     62         self.assertEqual(cashe_size, self.seq.get_cachesize())
     63 
     64     def test_flags(self):
     65         self.seq = db.DBSequence(self.d, flags=0)
     66         flag = db.DB_SEQ_WRAP;
     67         self.assertEqual(None, self.seq.set_flags(flag))
     68         self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
     69         self.assertEqual(flag, self.seq.get_flags() & flag)
     70 
     71     def test_range(self):
     72         self.seq = db.DBSequence(self.d, flags=0)
     73         seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
     74         self.assertEqual(None, self.seq.set_range(seq_range))
     75         self.seq.initial_value(seq_range[0])
     76         self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
     77         self.assertEqual(seq_range, self.seq.get_range())
     78 
     79     def test_stat(self):
     80         self.seq = db.DBSequence(self.d, flags=0)
     81         self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
     82         stat = self.seq.stat()
     83         for param in ('nowait', 'min', 'max', 'value', 'current',
     84                       'flags', 'cache_size', 'last_value', 'wait'):
     85             self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
     86 
     87     if db.version() >= (4,7) :
     88         # This code checks a crash solved in Berkeley DB 4.7
     89         def test_stat_crash(self) :
     90             d=db.DB()
     91             d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE)  # In RAM
     92             seq = db.DBSequence(d, flags=0)
     93 
     94             self.assertRaises(db.DBNotFoundError, seq.open,
     95                     key='id', txn=None, flags=0)
     96 
     97             self.assertRaises(db.DBInvalidArgError, seq.stat)
     98 
     99             d.close()
    100 
    101     def test_64bits(self) :
    102         # We don't use both extremes because they are problematic
    103         value_plus=(1L<<63)-2
    104         self.assertEqual(9223372036854775806L,value_plus)
    105         value_minus=(-1L<<63)+1  # Two complement
    106         self.assertEqual(-9223372036854775807L,value_minus)
    107         self.seq = db.DBSequence(self.d, flags=0)
    108         self.assertEqual(None, self.seq.initial_value(value_plus-1))
    109         self.assertEqual(None, self.seq.open(key='id', txn=None,
    110             flags=db.DB_CREATE))
    111         self.assertEqual(value_plus-1, self.seq.get(1))
    112         self.assertEqual(value_plus, self.seq.get(1))
    113 
    114         self.seq.remove(txn=None, flags=0)
    115 
    116         self.seq = db.DBSequence(self.d, flags=0)
    117         self.assertEqual(None, self.seq.initial_value(value_minus))
    118         self.assertEqual(None, self.seq.open(key='id', txn=None,
    119             flags=db.DB_CREATE))
    120         self.assertEqual(value_minus, self.seq.get(1))
    121         self.assertEqual(value_minus+1, self.seq.get(1))
    122 
    123     def test_multiple_close(self):
    124         self.seq = db.DBSequence(self.d)
    125         self.seq.close()  # You can close a Sequence multiple times
    126         self.seq.close()
    127         self.seq.close()
    128 
    129 def test_suite():
    130     suite = unittest.TestSuite()
    131     suite.addTest(unittest.makeSuite(DBSequenceTest))
    132     return suite
    133 
    134 
    135 if __name__ == '__main__':
    136     unittest.main(defaultTest='test_suite')
    137